Rewrite SamplingCircularQueue
authoryurys@chromium.org <yurys@chromium.org@ce2b1a6d-e550-0410-aec6-3dcde31c8c00>
Fri, 23 Aug 2013 08:22:07 +0000 (08:22 +0000)
committeryurys@chromium.org <yurys@chromium.org@ce2b1a6d-e550-0410-aec6-3dcde31c8c00>
Fri, 23 Aug 2013 08:22:07 +0000 (08:22 +0000)
The new implementation:
* uses MemoryBarriers to make sure up-to-date data is accessed on both producer and consumer threads
* will not allow to overwrite records
* doesn't have notion of chunks, instead each entry is aligned on the cache line boundaries

BUG=v8:2814
R=bmeurer@chromium.org

Review URL: https://codereview.chromium.org/22849002

git-svn-id: http://v8.googlecode.com/svn/branches/bleeding_edge@16284 ce2b1a6d-e550-0410-aec6-3dcde31c8c00

12 files changed:
src/circular-queue-inl.h
src/circular-queue.cc [deleted file]
src/circular-queue.h
src/cpu-profiler-inl.h
src/cpu-profiler.cc
src/cpu-profiler.h
src/globals.h
src/sampler.cc
src/v8globals.h
test/cctest/test-circular-queue.cc
test/cctest/test-cpu-profiler.cc
tools/gyp/v8.gyp

index b48070a..8b09eeb 100644 (file)
 namespace v8 {
 namespace internal {
 
+template<typename T, unsigned L>
+SamplingCircularQueue<T, L>::SamplingCircularQueue()
+    : enqueue_pos_(buffer_),
+      dequeue_pos_(buffer_) {
+}
+
+
+template<typename T, unsigned L>
+SamplingCircularQueue<T, L>::~SamplingCircularQueue() {
+}
+
+
+template<typename T, unsigned L>
+T* SamplingCircularQueue<T, L>::StartDequeue() {
+  MemoryBarrier();
+  if (Acquire_Load(&dequeue_pos_->marker) == kFull) {
+    return &dequeue_pos_->record;
+  }
+  return NULL;
+}
+
+
+template<typename T, unsigned L>
+void SamplingCircularQueue<T, L>::FinishDequeue() {
+  Release_Store(&dequeue_pos_->marker, kEmpty);
+  dequeue_pos_ = Next(dequeue_pos_);
+}
 
-void* SamplingCircularQueue::Enqueue() {
-  if (producer_pos_->enqueue_pos == producer_pos_->next_chunk_pos) {
-    if (producer_pos_->enqueue_pos == buffer_ + buffer_size_) {
-      producer_pos_->next_chunk_pos = buffer_;
-      producer_pos_->enqueue_pos = buffer_;
-    }
-    Acquire_Store(producer_pos_->next_chunk_pos, kEnqueueStarted);
-    // Skip marker.
-    producer_pos_->enqueue_pos += 1;
-    producer_pos_->next_chunk_pos += chunk_size_;
+
+template<typename T, unsigned L>
+T* SamplingCircularQueue<T, L>::StartEnqueue() {
+  MemoryBarrier();
+  if (Acquire_Load(&enqueue_pos_->marker) == kEmpty) {
+    return &enqueue_pos_->record;
   }
-  void* result = producer_pos_->enqueue_pos;
-  producer_pos_->enqueue_pos += record_size_;
-  return result;
+  return NULL;
 }
 
 
-void SamplingCircularQueue::WrapPositionIfNeeded(
-    SamplingCircularQueue::Cell** pos) {
-  if (*pos == buffer_ + buffer_size_) *pos = buffer_;
+template<typename T, unsigned L>
+void SamplingCircularQueue<T, L>::FinishEnqueue() {
+  Release_Store(&enqueue_pos_->marker, kFull);
+  enqueue_pos_ = Next(enqueue_pos_);
 }
 
 
+template<typename T, unsigned L>
+typename SamplingCircularQueue<T, L>::Entry* SamplingCircularQueue<T, L>::Next(
+    Entry* entry) {
+  Entry* next = entry + 1;
+  if (next == &buffer_[L]) return buffer_;
+  return next;
+}
+
 } }  // namespace v8::internal
 
 #endif  // V8_CIRCULAR_QUEUE_INL_H_
diff --git a/src/circular-queue.cc b/src/circular-queue.cc
deleted file mode 100644 (file)
index 0aea343..0000000
+++ /dev/null
@@ -1,125 +0,0 @@
-// Copyright 2010 the V8 project authors. All rights reserved.
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-//     * Redistributions of source code must retain the above copyright
-//       notice, this list of conditions and the following disclaimer.
-//     * Redistributions in binary form must reproduce the above
-//       copyright notice, this list of conditions and the following
-//       disclaimer in the documentation and/or other materials provided
-//       with the distribution.
-//     * Neither the name of Google Inc. nor the names of its
-//       contributors may be used to endorse or promote products derived
-//       from this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-#include "v8.h"
-
-#include "circular-queue-inl.h"
-
-namespace v8 {
-namespace internal {
-
-
-SamplingCircularQueue::SamplingCircularQueue(size_t record_size_in_bytes,
-                                             size_t desired_chunk_size_in_bytes,
-                                             unsigned buffer_size_in_chunks)
-    : record_size_(record_size_in_bytes / sizeof(Cell)),
-      chunk_size_in_bytes_(desired_chunk_size_in_bytes / record_size_in_bytes *
-                        record_size_in_bytes + sizeof(Cell)),
-      chunk_size_(chunk_size_in_bytes_ / sizeof(Cell)),
-      buffer_size_(chunk_size_ * buffer_size_in_chunks),
-      buffer_(NewArray<Cell>(buffer_size_)) {
-  ASSERT(record_size_ * sizeof(Cell) == record_size_in_bytes);
-  ASSERT(chunk_size_ * sizeof(Cell) == chunk_size_in_bytes_);
-  ASSERT(buffer_size_in_chunks > 2);
-  // Mark all chunks as clear.
-  for (size_t i = 0; i < buffer_size_; i += chunk_size_) {
-    buffer_[i] = kClear;
-  }
-
-  // Layout producer and consumer position pointers each on their own
-  // cache lines to avoid cache lines thrashing due to simultaneous
-  // updates of positions by different processor cores.
-  const int positions_size =
-      RoundUp(1, kProcessorCacheLineSize) +
-      RoundUp(static_cast<int>(sizeof(ProducerPosition)),
-              kProcessorCacheLineSize) +
-      RoundUp(static_cast<int>(sizeof(ConsumerPosition)),
-              kProcessorCacheLineSize);
-  positions_ = NewArray<byte>(positions_size);
-
-  producer_pos_ = reinterpret_cast<ProducerPosition*>(
-      RoundUp(positions_, kProcessorCacheLineSize));
-  producer_pos_->next_chunk_pos = buffer_;
-  producer_pos_->enqueue_pos = buffer_;
-
-  consumer_pos_ = reinterpret_cast<ConsumerPosition*>(
-      reinterpret_cast<byte*>(producer_pos_) + kProcessorCacheLineSize);
-  ASSERT(reinterpret_cast<byte*>(consumer_pos_ + 1) <=
-         positions_ + positions_size);
-  consumer_pos_->dequeue_chunk_pos = buffer_;
-  // The distance ensures that producer and consumer never step on
-  // each other's chunks and helps eviction of produced data from
-  // the CPU cache (having that chunk size is bigger than the cache.)
-  const size_t producer_consumer_distance = (2 * chunk_size_);
-  consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance;
-  consumer_pos_->dequeue_pos = NULL;
-}
-
-
-SamplingCircularQueue::~SamplingCircularQueue() {
-  DeleteArray(positions_);
-  DeleteArray(buffer_);
-}
-
-
-void* SamplingCircularQueue::StartDequeue() {
-  if (consumer_pos_->dequeue_pos != NULL) {
-    return consumer_pos_->dequeue_pos;
-  } else {
-    if (Acquire_Load(consumer_pos_->dequeue_chunk_poll_pos) != kClear) {
-      // Skip marker.
-      consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos + 1;
-      consumer_pos_->dequeue_end_pos =
-          consumer_pos_->dequeue_chunk_pos + chunk_size_;
-      return consumer_pos_->dequeue_pos;
-    } else {
-      return NULL;
-    }
-  }
-}
-
-
-void SamplingCircularQueue::FinishDequeue() {
-  consumer_pos_->dequeue_pos += record_size_;
-  if (consumer_pos_->dequeue_pos < consumer_pos_->dequeue_end_pos) return;
-  // Move to next chunk.
-  consumer_pos_->dequeue_pos = NULL;
-  *consumer_pos_->dequeue_chunk_pos = kClear;
-  consumer_pos_->dequeue_chunk_pos += chunk_size_;
-  WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_pos);
-  consumer_pos_->dequeue_chunk_poll_pos += chunk_size_;
-  WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_poll_pos);
-}
-
-
-void SamplingCircularQueue::FlushResidualRecords() {
-  // Eliminate producer / consumer distance.
-  consumer_pos_->dequeue_chunk_poll_pos = consumer_pos_->dequeue_chunk_pos;
-}
-
-
-} }  // namespace v8::internal
index 4ad4f4b..3ecba1f 100644 (file)
@@ -28,6 +28,8 @@
 #ifndef V8_CIRCULAR_QUEUE_H_
 #define V8_CIRCULAR_QUEUE_H_
 
+#include "v8globals.h"
+
 namespace v8 {
 namespace internal {
 
@@ -35,67 +37,50 @@ namespace internal {
 // Lock-free cache-friendly sampling circular queue for large
 // records. Intended for fast transfer of large records between a
 // single producer and a single consumer. If the queue is full,
-// previous unread records are overwritten. The queue is designed with
+// StartEnqueue will return NULL. The queue is designed with
 // a goal in mind to evade cache lines thrashing by preventing
 // simultaneous reads and writes to adjanced memory locations.
-//
-// IMPORTANT: as a producer never checks for chunks cleanness, it is
-// possible that it can catch up and overwrite a chunk that a consumer
-// is currently reading, resulting in a corrupt record being read.
+template<typename T, unsigned Length>
 class SamplingCircularQueue {
  public:
   // Executed on the application thread.
-  SamplingCircularQueue(size_t record_size_in_bytes,
-                        size_t desired_chunk_size_in_bytes,
-                        unsigned buffer_size_in_chunks);
+  SamplingCircularQueue();
   ~SamplingCircularQueue();
 
-  // Enqueue returns a pointer to a memory location for storing the next
-  // record.
-  INLINE(void* Enqueue());
+  // StartEnqueue returns a pointer to a memory location for storing the next
+  // record or NULL if all entries are full at the moment.
+  T* StartEnqueue();
+  // Notifies the queue that the producer has complete writing data into the
+  // memory returned by StartEnqueue and it can be passed to the consumer.
+  void FinishEnqueue();
 
   // Executed on the consumer (analyzer) thread.
   // StartDequeue returns a pointer to a memory location for retrieving
   // the next record. After the record had been read by a consumer,
   // FinishDequeue must be called. Until that moment, subsequent calls
   // to StartDequeue will return the same pointer.
-  void* StartDequeue();
+  T* StartDequeue();
   void FinishDequeue();
-  // Due to a presence of slipping between the producer and the consumer,
-  // the queue must be notified whether producing has been finished in order
-  // to process remaining records from the buffer.
-  void FlushResidualRecords();
-
-  typedef AtomicWord Cell;
 
  private:
-  // Reserved values for the chunk marker (first Cell in each chunk).
+  // Reserved values for the entry marker.
   enum {
-    kClear,          // Marks clean (processed) chunks.
-    kEnqueueStarted  // Marks chunks where enqueue started.
+    kEmpty,  // Marks clean (processed) entries.
+    kFull    // Marks entries already filled by the producer but not yet
+             // completely processed by the consumer.
   };
 
-  struct ProducerPosition {
-    Cell* next_chunk_pos;
-    Cell* enqueue_pos;
-  };
-  struct ConsumerPosition {
-    Cell* dequeue_chunk_pos;
-    Cell* dequeue_chunk_poll_pos;
-    Cell* dequeue_pos;
-    Cell* dequeue_end_pos;
+  struct V8_ALIGNAS(PROCESSOR_CACHE_LINE_SIZE) Entry {
+    Entry() : marker(kEmpty) {}
+    T record;
+    Atomic32 marker;
   };
 
-  INLINE(void WrapPositionIfNeeded(Cell** pos));
+  Entry* Next(Entry* entry);
 
-  const size_t record_size_;
-  const size_t chunk_size_in_bytes_;
-  const size_t chunk_size_;
-  const size_t buffer_size_;
-  Cell* buffer_;
-  byte* positions_;
-  ProducerPosition* producer_pos_;
-  ConsumerPosition* consumer_pos_;
+  Entry buffer_[Length];
+  Entry* enqueue_pos_ V8_ALIGNAS(PROCESSOR_CACHE_LINE_SIZE);
+  Entry* dequeue_pos_ V8_ALIGNAS(PROCESSOR_CACHE_LINE_SIZE);
 
   DISALLOW_COPY_AND_ASSIGN(SamplingCircularQueue);
 };
index 868ec64..7bfbf5c 100644 (file)
@@ -67,13 +67,30 @@ void ReportBuiltinEventRecord::UpdateCodeMap(CodeMap* code_map) {
 }
 
 
-TickSample* ProfilerEventsProcessor::TickSampleEvent() {
+TickSample* CpuProfiler::StartTickSample() {
+  if (is_profiling_) return processor_->StartTickSample();
+  return NULL;
+}
+
+
+void CpuProfiler::FinishTickSample() {
+  processor_->FinishTickSample();
+}
+
+
+TickSample* ProfilerEventsProcessor::StartTickSample() {
+  void* address = ticks_buffer_.StartEnqueue();
+  if (address == NULL) return NULL;
   TickSampleEventRecord* evt =
-      new(ticks_buffer_.Enqueue()) TickSampleEventRecord(last_code_event_id_);
+      new(address) TickSampleEventRecord(last_code_event_id_);
   return &evt->sample;
 }
 
 
+void ProfilerEventsProcessor::FinishTickSample() {
+  ticks_buffer_.FinishEnqueue();
+}
+
 } }  // namespace v8::internal
 
 #endif  // V8_CPU_PROFILER_INL_H_
index 747542f..ed7d5a9 100644 (file)
@@ -40,8 +40,6 @@
 namespace v8 {
 namespace internal {
 
-static const int kTickSamplesBufferChunkSize = 64 * KB;
-static const int kTickSamplesBufferChunksCount = 16;
 static const int kProfilerStackSize = 64 * KB;
 
 
@@ -49,9 +47,6 @@ ProfilerEventsProcessor::ProfilerEventsProcessor(ProfileGenerator* generator)
     : Thread(Thread::Options("v8:ProfEvntProc", kProfilerStackSize)),
       generator_(generator),
       running_(true),
-      ticks_buffer_(sizeof(TickSampleEventRecord),
-                    kTickSamplesBufferChunkSize,
-                    kTickSamplesBufferChunksCount),
       last_code_event_id_(0), last_processed_code_event_id_(0) {
 }
 
@@ -114,23 +109,10 @@ bool ProfilerEventsProcessor::ProcessTicks() {
       generator_->RecordTickSample(record.sample);
     }
 
-    const TickSampleEventRecord* rec =
-        TickSampleEventRecord::cast(ticks_buffer_.StartDequeue());
-    if (rec == NULL) return !ticks_from_vm_buffer_.IsEmpty();
-    // Make a local copy of tick sample record to ensure that it won't
-    // be modified as we are processing it. This is possible as the
-    // sampler writes w/o any sync to the queue, so if the processor
-    // will get far behind, a record may be modified right under its
-    // feet.
-    TickSampleEventRecord record = *rec;
-    if (record.order != last_processed_code_event_id_) return true;
-
-    // A paranoid check to make sure that we don't get a memory overrun
-    // in case of frames_count having a wild value.
-    if (record.sample.frames_count < 0
-        || record.sample.frames_count > TickSample::kMaxFramesCount)
-      record.sample.frames_count = 0;
-    generator_->RecordTickSample(record.sample);
+    const TickSampleEventRecord* record = ticks_buffer_.StartDequeue();
+    if (record == NULL) return !ticks_from_vm_buffer_.IsEmpty();
+    if (record->order != last_processed_code_event_id_) return true;
+    generator_->RecordTickSample(record->sample);
     ticks_buffer_.FinishDequeue();
   }
 }
@@ -148,7 +130,6 @@ void ProfilerEventsProcessor::Run() {
   }
 
   // Process remaining tick events.
-  ticks_buffer_.FlushResidualRecords();
   do {
     ProcessTicks();
   } while (ProcessCodeEvent());
@@ -166,12 +147,6 @@ CpuProfile* CpuProfiler::GetProfile(int index) {
 }
 
 
-TickSample* CpuProfiler::TickSampleEvent() {
-  if (is_profiling_) return processor_->TickSampleEvent();
-  return NULL;
-}
-
-
 void CpuProfiler::DeleteAllProfiles() {
   if (is_profiling_) StopProcessor();
   ResetProfiles();
index 1dd405e..ddd27a8 100644 (file)
@@ -114,10 +114,6 @@ class TickSampleEventRecord {
 
   unsigned order;
   TickSample sample;
-
-  static TickSampleEventRecord* cast(void* value) {
-    return reinterpret_cast<TickSampleEventRecord*>(value);
-  }
 };
 
 
@@ -156,7 +152,8 @@ class ProfilerEventsProcessor : public Thread {
   // queue (because the structure is of fixed width, but usually not all
   // stack frame entries are filled.) This method returns a pointer to the
   // next record of the buffer.
-  INLINE(TickSample* TickSampleEvent());
+  inline TickSample* StartTickSample();
+  inline void FinishTickSample();
 
  private:
   // Called from events processing thread (Run() method.)
@@ -166,7 +163,11 @@ class ProfilerEventsProcessor : public Thread {
   ProfileGenerator* generator_;
   bool running_;
   UnboundQueue<CodeEventsContainer> events_buffer_;
-  SamplingCircularQueue ticks_buffer_;
+  static const size_t kTickSampleBufferSize = 1 * MB;
+  static const size_t kTickSampleQueueLength =
+      kTickSampleBufferSize / sizeof(TickSampleEventRecord);
+  SamplingCircularQueue<TickSampleEventRecord,
+                        kTickSampleQueueLength> ticks_buffer_;
   UnboundQueue<TickSampleEventRecord> ticks_from_vm_buffer_;
   unsigned last_code_event_id_;
   unsigned last_processed_code_event_id_;
@@ -205,7 +206,8 @@ class CpuProfiler : public CodeEventListener {
   void DeleteProfile(CpuProfile* profile);
 
   // Invoked from stack sampler (thread or signal handler.)
-  TickSample* TickSampleEvent();
+  inline TickSample* StartTickSample();
+  inline void FinishTickSample();
 
   // Must be called via PROFILE macro, otherwise will crash when
   // profiling is not enabled.
index 98e5d9c..478395a 100644 (file)
@@ -282,6 +282,10 @@ const int kOneByteSize    = kCharSize;
 const int kUC16Size     = sizeof(uc16);      // NOLINT
 
 
+// Round up n to be a multiple of sz, where sz is a power of 2.
+#define ROUND_UP(n, sz) (((n) + ((sz) - 1)) & ~((sz) - 1))
+
+
 // The expression OFFSET_OF(type, field) computes the byte-offset
 // of the specified field relative to the containing type. This
 // corresponds to 'offsetof' (in stddef.h), except that it doesn't
index 1d0cded..4a13b1d 100644 (file)
@@ -65,7 +65,7 @@
 
 #include "v8.h"
 
-#include "cpu-profiler.h"
+#include "cpu-profiler-inl.h"
 #include "flags.h"
 #include "frames-inl.h"
 #include "log.h"
@@ -693,7 +693,7 @@ void Sampler::Stop() {
 
 
 void Sampler::SampleStack(const RegisterState& state) {
-  TickSample* sample = isolate_->cpu_profiler()->TickSampleEvent();
+  TickSample* sample = isolate_->cpu_profiler()->StartTickSample();
   TickSample sample_obj;
   if (sample == NULL) sample = &sample_obj;
   sample->Init(isolate_, state);
@@ -703,6 +703,9 @@ void Sampler::SampleStack(const RegisterState& state) {
     }
   }
   Tick(sample);
+  if (sample != &sample_obj) {
+    isolate_->cpu_profiler()->FinishTickSample();
+  }
 }
 
 } }  // namespace v8::internal
index 00693fa..f76dcc2 100644 (file)
@@ -97,7 +97,7 @@ const int kPageSizeBits = 20;
 // On Intel architecture, cache line size is 64 bytes.
 // On ARM it may be less (32 bytes), but as far this constant is
 // used for aligning data, it doesn't hurt to align on a greater value.
-const int kProcessorCacheLineSize = 64;
+#define PROCESSOR_CACHE_LINE_SIZE 64
 
 // Constants relevant to double precision floating point numbers.
 // If looking only at the top 32 bits, the QNaN mask is bits 19 to 30.
index 4d7856e..7b21d1e 100644 (file)
@@ -35,40 +35,33 @@ using i::SamplingCircularQueue;
 
 
 TEST(SamplingCircularQueue) {
-  typedef SamplingCircularQueue::Cell Record;
-  const int kRecordsPerChunk = 4;
-  SamplingCircularQueue scq(sizeof(Record),
-                            kRecordsPerChunk * sizeof(Record),
-                            3);
+  typedef i::AtomicWord Record;
+  const int kMaxRecordsInQueue = 4;
+  SamplingCircularQueue<Record, kMaxRecordsInQueue> scq;
 
   // Check that we are using non-reserved values.
   // Fill up the first chunk.
   CHECK_EQ(NULL, scq.StartDequeue());
-  for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) {
-    Record* rec = reinterpret_cast<Record*>(scq.Enqueue());
+  for (Record i = 1; i < 1 + kMaxRecordsInQueue; ++i) {
+    Record* rec = reinterpret_cast<Record*>(scq.StartEnqueue());
     CHECK_NE(NULL, rec);
     *rec = i;
-    CHECK_EQ(NULL, scq.StartDequeue());
+    scq.FinishEnqueue();
   }
 
-  // Fill up the second chunk. Consumption must still be unavailable.
-  CHECK_EQ(NULL, scq.StartDequeue());
-  for (Record i = 10; i < 10 + kRecordsPerChunk; ++i) {
-    Record* rec = reinterpret_cast<Record*>(scq.Enqueue());
-    CHECK_NE(NULL, rec);
-    *rec = i;
-    CHECK_EQ(NULL, scq.StartDequeue());
-  }
+  // The queue is full, enqueue is not allowed.
+  CHECK_EQ(NULL, scq.StartEnqueue());
 
-  Record* rec = reinterpret_cast<Record*>(scq.Enqueue());
-  CHECK_NE(NULL, rec);
-  *rec = 20;
-  // Now as we started filling up the third chunk, consumption
-  // must become possible.
+  // Try to enqueue when the the queue is full. Consumption must be available.
   CHECK_NE(NULL, scq.StartDequeue());
+  for (int i = 0; i < 10; ++i) {
+    Record* rec = reinterpret_cast<Record*>(scq.StartEnqueue());
+    CHECK_EQ(NULL, rec);
+    CHECK_NE(NULL, scq.StartDequeue());
+  }
 
-  // Consume the first chunk.
-  for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) {
+  // Consume all records.
+  for (Record i = 1; i < 1 + kMaxRecordsInQueue; ++i) {
     Record* rec = reinterpret_cast<Record*>(scq.StartDequeue());
     CHECK_NE(NULL, rec);
     CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec));
@@ -76,16 +69,21 @@ TEST(SamplingCircularQueue) {
     scq.FinishDequeue();
     CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
   }
-  // Now consumption must not be possible, as consumer now polls
-  // the first chunk for emptinness.
+  // The queue is empty.
+  CHECK_EQ(NULL, scq.StartDequeue());
+
+
   CHECK_EQ(NULL, scq.StartDequeue());
+  for (Record i = 0; i < kMaxRecordsInQueue / 2; ++i) {
+    Record* rec = reinterpret_cast<Record*>(scq.StartEnqueue());
+    CHECK_NE(NULL, rec);
+    *rec = i;
+    scq.FinishEnqueue();
+  }
 
-  scq.FlushResidualRecords();
-  // From now, consumer no more polls ahead of the current chunk,
-  // so it's possible to consume the second chunk.
+  // Consume all available kMaxRecordsInQueue / 2 records.
   CHECK_NE(NULL, scq.StartDequeue());
-  // Consume the second chunk
-  for (Record i = 10; i < 10 + kRecordsPerChunk; ++i) {
+  for (Record i = 0; i < kMaxRecordsInQueue / 2; ++i) {
     Record* rec = reinterpret_cast<Record*>(scq.StartDequeue());
     CHECK_NE(NULL, rec);
     CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec));
@@ -93,19 +91,20 @@ TEST(SamplingCircularQueue) {
     scq.FinishDequeue();
     CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
   }
-  // Consumption must still be possible as the first cell of the
-  // last chunk is not clean.
-  CHECK_NE(NULL, scq.StartDequeue());
+
+  // The queue is empty.
+  CHECK_EQ(NULL, scq.StartDequeue());
 }
 
 
 namespace {
 
+typedef i::AtomicWord Record;
+typedef SamplingCircularQueue<Record, 12> TestSampleQueue;
+
 class ProducerThread: public i::Thread {
  public:
-  typedef SamplingCircularQueue::Cell Record;
-
-  ProducerThread(SamplingCircularQueue* scq,
+  ProducerThread(TestSampleQueue* scq,
                  int records_per_chunk,
                  Record value,
                  i::Semaphore* finished)
@@ -117,16 +116,17 @@ class ProducerThread: public i::Thread {
 
   virtual void Run() {
     for (Record i = value_; i < value_ + records_per_chunk_; ++i) {
-      Record* rec = reinterpret_cast<Record*>(scq_->Enqueue());
+      Record* rec = reinterpret_cast<Record*>(scq_->StartEnqueue());
       CHECK_NE(NULL, rec);
       *rec = i;
+      scq_->FinishEnqueue();
     }
 
     finished_->Signal();
   }
 
  private:
-  SamplingCircularQueue* scq_;
+  TestSampleQueue* scq_;
   const int records_per_chunk_;
   Record value_;
   i::Semaphore* finished_;
@@ -140,17 +140,10 @@ TEST(SamplingCircularQueueMultithreading) {
   // to the case of profiling under Linux, where signal handler that
   // does sampling is called in the context of different VM threads.
 
-  typedef ProducerThread::Record Record;
   const int kRecordsPerChunk = 4;
-  SamplingCircularQueue scq(sizeof(Record),
-                            kRecordsPerChunk * sizeof(Record),
-                            3);
+  TestSampleQueue scq;
   i::Semaphore* semaphore = i::OS::CreateSemaphore(0);
-  // Don't poll ahead, making possible to check data in the buffer
-  // immediately after enqueuing.
-  scq.FlushResidualRecords();
 
-  // Check that we are using non-reserved values.
   ProducerThread producer1(&scq, kRecordsPerChunk, 1, semaphore);
   ProducerThread producer2(&scq, kRecordsPerChunk, 10, semaphore);
   ProducerThread producer3(&scq, kRecordsPerChunk, 20, semaphore);
index 7b5fc2b..43fb074 100644 (file)
@@ -63,7 +63,7 @@ static void EnqueueTickSampleEvent(ProfilerEventsProcessor* proc,
                                    i::Address frame1,
                                    i::Address frame2 = NULL,
                                    i::Address frame3 = NULL) {
-  i::TickSample* sample = proc->TickSampleEvent();
+  i::TickSample* sample = proc->StartTickSample();
   sample->pc = frame1;
   sample->tos = frame1;
   sample->frames_count = 0;
@@ -75,6 +75,7 @@ static void EnqueueTickSampleEvent(ProfilerEventsProcessor* proc,
     sample->stack[1] = frame3;
     sample->frames_count = 2;
   }
+  proc->FinishTickSample();
 }
 
 namespace {
@@ -276,13 +277,14 @@ TEST(Issue1398) {
 
   profiler.CodeCreateEvent(i::Logger::BUILTIN_TAG, code, "bbb");
 
-  i::TickSample* sample = processor.TickSampleEvent();
+  i::TickSample* sample = processor.StartTickSample();
   sample->pc = code->address();
   sample->tos = 0;
   sample->frames_count = i::TickSample::kMaxFramesCount;
   for (int i = 0; i < sample->frames_count; ++i) {
     sample->stack[i] = code->address();
   }
+  processor.FinishTickSample();
 
   processor.StopSynchronously();
   processor.Join();
index eccd7d2..ccd9ced 100644 (file)
         '../../src/checks.cc',
         '../../src/checks.h',
         '../../src/circular-queue-inl.h',
-        '../../src/circular-queue.cc',
         '../../src/circular-queue.h',
         '../../src/code-stubs.cc',
         '../../src/code-stubs.h',