From 71754ebe81875fac97e9ef2d5340d77cf5c53515 Mon Sep 17 00:00:00 2001 From: "mikhail.naganov@gmail.com" Date: Mon, 22 Mar 2010 14:23:45 +0000 Subject: [PATCH] Add multithreading test for SamplingCircularQueue, fix implementation. This is for the case of Linux, where sampling is done using SIGPROF signal handler which is executed in the context of an interrupted thread. In this case, my previous implementation with TLS doesn't work. Review URL: http://codereview.chromium.org/1138004 git-svn-id: http://v8.googlecode.com/svn/branches/bleeding_edge@4207 ce2b1a6d-e550-0410-aec6-3dcde31c8c00 --- src/circular-queue-inl.h | 9 ++-- src/circular-queue.cc | 86 ++++++++++++++----------------- src/circular-queue.h | 15 +++--- src/cpu-profiler.cc | 2 - src/cpu-profiler.h | 3 -- src/globals.h | 4 ++ test/cctest/test-circular-queue.cc | 102 +++++++++++++++++++++++++++++++++++-- test/cctest/test-cpu-profiler.cc | 6 --- 8 files changed, 149 insertions(+), 78 deletions(-) diff --git a/src/circular-queue-inl.h b/src/circular-queue-inl.h index ffe8fb0..962b069 100644 --- a/src/circular-queue-inl.h +++ b/src/circular-queue-inl.h @@ -82,11 +82,10 @@ Record* CircularQueue::Next(Record* curr) { void* SamplingCircularQueue::Enqueue() { - Cell* enqueue_pos = reinterpret_cast( - Thread::GetThreadLocal(producer_key_)); - WrapPositionIfNeeded(&enqueue_pos); - Thread::SetThreadLocal(producer_key_, enqueue_pos + record_size_); - return enqueue_pos; + WrapPositionIfNeeded(&producer_pos_->enqueue_pos); + void* result = producer_pos_->enqueue_pos; + producer_pos_->enqueue_pos += record_size_; + return result; } diff --git a/src/circular-queue.cc b/src/circular-queue.cc index 5f7a33e..a7c2532 100644 --- a/src/circular-queue.cc +++ b/src/circular-queue.cc @@ -52,52 +52,44 @@ SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes, buffer_[i] = kClear; } buffer_[buffer_size_] = kEnd; + + // Layout producer and consumer position pointers each on their own + // cache lines to avoid cache lines thrashing due to simultaneous + // updates of positions by different processor cores. + const int positions_size = + RoundUp(1, kProcessorCacheLineSize) + + RoundUp(sizeof(ProducerPosition), kProcessorCacheLineSize) + + RoundUp(sizeof(ConsumerPosition), kProcessorCacheLineSize); + positions_ = NewArray(positions_size); + + producer_pos_ = reinterpret_cast( + RoundUp(positions_, kProcessorCacheLineSize)); + producer_pos_->enqueue_pos = buffer_; + + consumer_pos_ = reinterpret_cast( + reinterpret_cast(producer_pos_) + kProcessorCacheLineSize); + ASSERT(reinterpret_cast(consumer_pos_ + 1) <= + positions_ + positions_size); + consumer_pos_->dequeue_chunk_pos = buffer_; + consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_; + consumer_pos_->dequeue_pos = NULL; } SamplingCircularQueue::~SamplingCircularQueue() { + DeleteArray(positions_); DeleteArray(buffer_); } -void SamplingCircularQueue::SetUpProducer() { - producer_key_ = Thread::CreateThreadLocalKey(); - Thread::SetThreadLocal(producer_key_, buffer_); -} - - -void SamplingCircularQueue::TearDownProducer() { - Thread::DeleteThreadLocalKey(producer_key_); -} - - -void SamplingCircularQueue::SetUpConsumer() { - consumer_key_ = Thread::CreateThreadLocalKey(); - ConsumerPosition* cp = new ConsumerPosition; - cp->dequeue_chunk_pos = buffer_; - cp->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_; - cp->dequeue_pos = NULL; - Thread::SetThreadLocal(consumer_key_, cp); -} - - -void SamplingCircularQueue::TearDownConsumer() { - delete reinterpret_cast( - Thread::GetThreadLocal(consumer_key_)); - Thread::DeleteThreadLocalKey(consumer_key_); -} - - void* SamplingCircularQueue::StartDequeue() { - ConsumerPosition* cp = reinterpret_cast( - Thread::GetThreadLocal(consumer_key_)); - if (cp->dequeue_pos != NULL) { - return cp->dequeue_pos; + if (consumer_pos_->dequeue_pos != NULL) { + return consumer_pos_->dequeue_pos; } else { - if (*cp->dequeue_chunk_poll_pos != kClear) { - cp->dequeue_pos = cp->dequeue_chunk_pos; - cp->dequeue_end_pos = cp->dequeue_pos + chunk_size_; - return cp->dequeue_pos; + if (*consumer_pos_->dequeue_chunk_poll_pos != kClear) { + consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos; + consumer_pos_->dequeue_end_pos = consumer_pos_->dequeue_pos + chunk_size_; + return consumer_pos_->dequeue_pos; } else { return NULL; } @@ -106,25 +98,21 @@ void* SamplingCircularQueue::StartDequeue() { void SamplingCircularQueue::FinishDequeue() { - ConsumerPosition* cp = reinterpret_cast( - Thread::GetThreadLocal(consumer_key_)); - cp->dequeue_pos += record_size_; - if (cp->dequeue_pos < cp->dequeue_end_pos) return; + consumer_pos_->dequeue_pos += record_size_; + if (consumer_pos_->dequeue_pos < consumer_pos_->dequeue_end_pos) return; // Move to next chunk. - cp->dequeue_pos = NULL; - *cp->dequeue_chunk_pos = kClear; - cp->dequeue_chunk_pos += chunk_size_; - WrapPositionIfNeeded(&cp->dequeue_chunk_pos); - cp->dequeue_chunk_poll_pos += chunk_size_; - WrapPositionIfNeeded(&cp->dequeue_chunk_poll_pos); + consumer_pos_->dequeue_pos = NULL; + *consumer_pos_->dequeue_chunk_pos = kClear; + consumer_pos_->dequeue_chunk_pos += chunk_size_; + WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_pos); + consumer_pos_->dequeue_chunk_poll_pos += chunk_size_; + WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_poll_pos); } void SamplingCircularQueue::FlushResidualRecords() { - ConsumerPosition* cp = reinterpret_cast( - Thread::GetThreadLocal(consumer_key_)); // Eliminate producer / consumer distance. - cp->dequeue_chunk_poll_pos = cp->dequeue_chunk_pos; + consumer_pos_->dequeue_chunk_poll_pos = consumer_pos_->dequeue_chunk_pos; } diff --git a/src/circular-queue.h b/src/circular-queue.h index 11159e0..dce7fc2 100644 --- a/src/circular-queue.h +++ b/src/circular-queue.h @@ -76,15 +76,11 @@ class SamplingCircularQueue { int buffer_size_in_chunks); ~SamplingCircularQueue(); - // Executed on the producer (sampler) or application thread. - void SetUpProducer(); // Enqueue returns a pointer to a memory location for storing the next // record. INLINE(void* Enqueue()); - void TearDownProducer(); // Executed on the consumer (analyzer) thread. - void SetUpConsumer(); // StartDequeue returns a pointer to a memory location for retrieving // the next record. After the record had been read by a consumer, // FinishDequeue must be called. Until that moment, subsequent calls @@ -95,7 +91,6 @@ class SamplingCircularQueue { // the queue must be notified whether producing has been finished in order // to process remaining records from the buffer. void FlushResidualRecords(); - void TearDownConsumer(); typedef AtomicWord Cell; // Reserved values for the first cell of a record. @@ -103,6 +98,9 @@ class SamplingCircularQueue { static const Cell kEnd = -1; // Marks the end of the buffer. private: + struct ProducerPosition { + Cell* enqueue_pos; + }; struct ConsumerPosition { Cell* dequeue_chunk_pos; Cell* dequeue_chunk_poll_pos; @@ -118,10 +116,9 @@ class SamplingCircularQueue { const int buffer_size_; const int producer_consumer_distance_; Cell* buffer_; - // Store producer and consumer data in TLS to avoid modifying the - // same CPU cache line from two threads simultaneously. - Thread::LocalStorageKey consumer_key_; - Thread::LocalStorageKey producer_key_; + byte* positions_; + ProducerPosition* producer_pos_; + ConsumerPosition* consumer_pos_; }; diff --git a/src/cpu-profiler.cc b/src/cpu-profiler.cc index d36f511..d16c17f 100644 --- a/src/cpu-profiler.cc +++ b/src/cpu-profiler.cc @@ -176,7 +176,6 @@ bool ProfilerEventsProcessor::ProcessTicks(unsigned dequeue_order) { void ProfilerEventsProcessor::Run() { - ticks_buffer_.SetUpConsumer(); unsigned dequeue_order = 0; running_ = true; @@ -194,7 +193,6 @@ void ProfilerEventsProcessor::Run() { ticks_buffer_.FlushResidualRecords(); // Perform processing until we have tick events, skip remaining code events. while (ProcessTicks(dequeue_order) && ProcessCodeEvent(&dequeue_order)) { } - ticks_buffer_.TearDownConsumer(); } diff --git a/src/cpu-profiler.h b/src/cpu-profiler.h index ccfac5c..8a7d2fd 100644 --- a/src/cpu-profiler.h +++ b/src/cpu-profiler.h @@ -154,14 +154,11 @@ class ProfilerEventsProcessor : public Thread { void FunctionMoveEvent(Address from, Address to); void FunctionDeleteEvent(Address from); - // Tick sampler registration. Called by sampler thread or signal handler. - inline void SetUpSamplesProducer() { ticks_buffer_.SetUpProducer(); } // Tick sample events are filled directly in the buffer of the circular // queue (because the structure is of fixed width, but usually not all // stack frame entries are filled.) This method returns a pointer to the // next record of the buffer. INLINE(TickSample* TickSampleEvent()); - inline void TearDownSamplesProducer() { ticks_buffer_.TearDownProducer(); } private: union CodeEventsContainer { diff --git a/src/globals.h b/src/globals.h index cb7f27e..90007e6 100644 --- a/src/globals.h +++ b/src/globals.h @@ -195,6 +195,10 @@ const Address kFromSpaceZapValue = reinterpret_cast
(0xbeefdad); // gives 8K bytes per page. const int kPageSizeBits = 13; +// On Intel architecture, cache line size is 64 bytes. +// On ARM it may be less (32 bytes), but as far this constant is +// used for aligning data, it doesn't hurt to align on a greater value. +const int kProcessorCacheLineSize = 64; // Constants relevant to double precision floating point numbers. diff --git a/test/cctest/test-circular-queue.cc b/test/cctest/test-circular-queue.cc index bb69c1b..3fa49bf 100644 --- a/test/cctest/test-circular-queue.cc +++ b/test/cctest/test-circular-queue.cc @@ -61,8 +61,6 @@ TEST(SamplingCircularQueue) { SamplingCircularQueue scq(sizeof(Record), kRecordsPerChunk * sizeof(Record), 3); - scq.SetUpProducer(); - scq.SetUpConsumer(); // Check that we are using non-reserved values. CHECK_NE(SamplingCircularQueue::kClear, 1); @@ -121,7 +119,103 @@ TEST(SamplingCircularQueue) { // Consumption must still be possible as the first cell of the // last chunk is not clean. CHECK_NE(NULL, scq.StartDequeue()); +} + + +namespace { + +class ProducerThread: public i::Thread { + public: + typedef SamplingCircularQueue::Cell Record; + + ProducerThread(SamplingCircularQueue* scq, + int records_per_chunk, + Record value, + i::Semaphore* finished) + : scq_(scq), + records_per_chunk_(records_per_chunk), + value_(value), + finished_(finished) { } + + virtual void Run() { + for (Record i = value_; i < value_ + records_per_chunk_; ++i) { + Record* rec = reinterpret_cast(scq_->Enqueue()); + CHECK_NE(NULL, rec); + *rec = i; + } + + finished_->Signal(); + } + + private: + SamplingCircularQueue* scq_; + const int records_per_chunk_; + Record value_; + i::Semaphore* finished_; +}; + +} // namespace + +TEST(SamplingCircularQueueMultithreading) { + // Emulate multiple VM threads working 'one thread at a time.' + // This test enqueues data from different threads. This corresponds + // to the case of profiling under Linux, where signal handler that + // does sampling is called in the context of different VM threads. + + typedef ProducerThread::Record Record; + const int kRecordsPerChunk = 4; + SamplingCircularQueue scq(sizeof(Record), + kRecordsPerChunk * sizeof(Record), + 3); + i::Semaphore* semaphore = i::OS::CreateSemaphore(0); + // Don't poll ahead, making possible to check data in the buffer + // immediately after enqueuing. + scq.FlushResidualRecords(); + + // Check that we are using non-reserved values. + CHECK_NE(SamplingCircularQueue::kClear, 1); + CHECK_NE(SamplingCircularQueue::kEnd, 1); + ProducerThread producer1(&scq, kRecordsPerChunk, 1, semaphore); + ProducerThread producer2(&scq, kRecordsPerChunk, 10, semaphore); + ProducerThread producer3(&scq, kRecordsPerChunk, 20, semaphore); + + CHECK_EQ(NULL, scq.StartDequeue()); + producer1.Start(); + semaphore->Wait(); + for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) { + Record* rec = reinterpret_cast(scq.StartDequeue()); + CHECK_NE(NULL, rec); + CHECK_EQ(static_cast(i), static_cast(*rec)); + CHECK_EQ(rec, reinterpret_cast(scq.StartDequeue())); + scq.FinishDequeue(); + CHECK_NE(rec, reinterpret_cast(scq.StartDequeue())); + } + + CHECK_EQ(NULL, scq.StartDequeue()); + producer2.Start(); + semaphore->Wait(); + for (Record i = 10; i < 10 + kRecordsPerChunk; ++i) { + Record* rec = reinterpret_cast(scq.StartDequeue()); + CHECK_NE(NULL, rec); + CHECK_EQ(static_cast(i), static_cast(*rec)); + CHECK_EQ(rec, reinterpret_cast(scq.StartDequeue())); + scq.FinishDequeue(); + CHECK_NE(rec, reinterpret_cast(scq.StartDequeue())); + } + + CHECK_EQ(NULL, scq.StartDequeue()); + producer3.Start(); + semaphore->Wait(); + for (Record i = 20; i < 20 + kRecordsPerChunk; ++i) { + Record* rec = reinterpret_cast(scq.StartDequeue()); + CHECK_NE(NULL, rec); + CHECK_EQ(static_cast(i), static_cast(*rec)); + CHECK_EQ(rec, reinterpret_cast(scq.StartDequeue())); + scq.FinishDequeue(); + CHECK_NE(rec, reinterpret_cast(scq.StartDequeue())); + } + + CHECK_EQ(NULL, scq.StartDequeue()); - scq.TearDownConsumer(); - scq.TearDownProducer(); + delete semaphore; } diff --git a/test/cctest/test-cpu-profiler.cc b/test/cctest/test-cpu-profiler.cc index bd966fa..2fff4fa 100644 --- a/test/cctest/test-cpu-profiler.cc +++ b/test/cctest/test-cpu-profiler.cc @@ -64,7 +64,6 @@ TEST(CodeEvents) { ProfileGenerator generator(&profiles); ProfilerEventsProcessor processor(&generator); processor.Start(); - processor.SetUpSamplesProducer(); while (!processor.running()) { i::Thread::YieldCPU(); } @@ -117,8 +116,6 @@ TEST(CodeEvents) { CodeEntry* entry5 = generator.code_map()->FindEntry(ToAddress(0x1700)); CHECK_NE(NULL, entry5); CHECK_EQ(aaa_str, entry5->name()); - - processor.TearDownSamplesProducer(); } @@ -133,7 +130,6 @@ TEST(TickEvents) { ProfileGenerator generator(&profiles); ProfilerEventsProcessor processor(&generator); processor.Start(); - processor.SetUpSamplesProducer(); while (!processor.running()) { i::Thread::YieldCPU(); } @@ -197,6 +193,4 @@ TEST(TickEvents) { bottom_up_ddd_children.last()->GetChildren(&bottom_up_ddd_stub_children); CHECK_EQ(1, bottom_up_ddd_stub_children.length()); CHECK_EQ("bbb", bottom_up_ddd_stub_children.last()->entry()->name()); - - processor.TearDownSamplesProducer(); } -- 2.7.4