void* SamplingCircularQueue::Enqueue() {
- Cell* enqueue_pos = reinterpret_cast<Cell*>(
- Thread::GetThreadLocal(producer_key_));
- WrapPositionIfNeeded(&enqueue_pos);
- Thread::SetThreadLocal(producer_key_, enqueue_pos + record_size_);
- return enqueue_pos;
+ WrapPositionIfNeeded(&producer_pos_->enqueue_pos);
+ void* result = producer_pos_->enqueue_pos;
+ producer_pos_->enqueue_pos += record_size_;
+ return result;
}
buffer_[i] = kClear;
}
buffer_[buffer_size_] = kEnd;
+
+ // Layout producer and consumer position pointers each on their own
+ // cache lines to avoid cache lines thrashing due to simultaneous
+ // updates of positions by different processor cores.
+ const int positions_size =
+ RoundUp(1, kProcessorCacheLineSize) +
+ RoundUp(sizeof(ProducerPosition), kProcessorCacheLineSize) +
+ RoundUp(sizeof(ConsumerPosition), kProcessorCacheLineSize);
+ positions_ = NewArray<byte>(positions_size);
+
+ producer_pos_ = reinterpret_cast<ProducerPosition*>(
+ RoundUp(positions_, kProcessorCacheLineSize));
+ producer_pos_->enqueue_pos = buffer_;
+
+ consumer_pos_ = reinterpret_cast<ConsumerPosition*>(
+ reinterpret_cast<byte*>(producer_pos_) + kProcessorCacheLineSize);
+ ASSERT(reinterpret_cast<byte*>(consumer_pos_ + 1) <=
+ positions_ + positions_size);
+ consumer_pos_->dequeue_chunk_pos = buffer_;
+ consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
+ consumer_pos_->dequeue_pos = NULL;
}
SamplingCircularQueue::~SamplingCircularQueue() {
+ DeleteArray(positions_);
DeleteArray(buffer_);
}
-void SamplingCircularQueue::SetUpProducer() {
- producer_key_ = Thread::CreateThreadLocalKey();
- Thread::SetThreadLocal(producer_key_, buffer_);
-}
-
-
-void SamplingCircularQueue::TearDownProducer() {
- Thread::DeleteThreadLocalKey(producer_key_);
-}
-
-
-void SamplingCircularQueue::SetUpConsumer() {
- consumer_key_ = Thread::CreateThreadLocalKey();
- ConsumerPosition* cp = new ConsumerPosition;
- cp->dequeue_chunk_pos = buffer_;
- cp->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
- cp->dequeue_pos = NULL;
- Thread::SetThreadLocal(consumer_key_, cp);
-}
-
-
-void SamplingCircularQueue::TearDownConsumer() {
- delete reinterpret_cast<ConsumerPosition*>(
- Thread::GetThreadLocal(consumer_key_));
- Thread::DeleteThreadLocalKey(consumer_key_);
-}
-
-
void* SamplingCircularQueue::StartDequeue() {
- ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
- Thread::GetThreadLocal(consumer_key_));
- if (cp->dequeue_pos != NULL) {
- return cp->dequeue_pos;
+ if (consumer_pos_->dequeue_pos != NULL) {
+ return consumer_pos_->dequeue_pos;
} else {
- if (*cp->dequeue_chunk_poll_pos != kClear) {
- cp->dequeue_pos = cp->dequeue_chunk_pos;
- cp->dequeue_end_pos = cp->dequeue_pos + chunk_size_;
- return cp->dequeue_pos;
+ if (*consumer_pos_->dequeue_chunk_poll_pos != kClear) {
+ consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos;
+ consumer_pos_->dequeue_end_pos = consumer_pos_->dequeue_pos + chunk_size_;
+ return consumer_pos_->dequeue_pos;
} else {
return NULL;
}
void SamplingCircularQueue::FinishDequeue() {
- ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
- Thread::GetThreadLocal(consumer_key_));
- cp->dequeue_pos += record_size_;
- if (cp->dequeue_pos < cp->dequeue_end_pos) return;
+ consumer_pos_->dequeue_pos += record_size_;
+ if (consumer_pos_->dequeue_pos < consumer_pos_->dequeue_end_pos) return;
// Move to next chunk.
- cp->dequeue_pos = NULL;
- *cp->dequeue_chunk_pos = kClear;
- cp->dequeue_chunk_pos += chunk_size_;
- WrapPositionIfNeeded(&cp->dequeue_chunk_pos);
- cp->dequeue_chunk_poll_pos += chunk_size_;
- WrapPositionIfNeeded(&cp->dequeue_chunk_poll_pos);
+ consumer_pos_->dequeue_pos = NULL;
+ *consumer_pos_->dequeue_chunk_pos = kClear;
+ consumer_pos_->dequeue_chunk_pos += chunk_size_;
+ WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_pos);
+ consumer_pos_->dequeue_chunk_poll_pos += chunk_size_;
+ WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_poll_pos);
}
void SamplingCircularQueue::FlushResidualRecords() {
- ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
- Thread::GetThreadLocal(consumer_key_));
// Eliminate producer / consumer distance.
- cp->dequeue_chunk_poll_pos = cp->dequeue_chunk_pos;
+ consumer_pos_->dequeue_chunk_poll_pos = consumer_pos_->dequeue_chunk_pos;
}
int buffer_size_in_chunks);
~SamplingCircularQueue();
- // Executed on the producer (sampler) or application thread.
- void SetUpProducer();
// Enqueue returns a pointer to a memory location for storing the next
// record.
INLINE(void* Enqueue());
- void TearDownProducer();
// Executed on the consumer (analyzer) thread.
- void SetUpConsumer();
// StartDequeue returns a pointer to a memory location for retrieving
// the next record. After the record had been read by a consumer,
// FinishDequeue must be called. Until that moment, subsequent calls
// the queue must be notified whether producing has been finished in order
// to process remaining records from the buffer.
void FlushResidualRecords();
- void TearDownConsumer();
typedef AtomicWord Cell;
// Reserved values for the first cell of a record.
static const Cell kEnd = -1; // Marks the end of the buffer.
private:
+ struct ProducerPosition {
+ Cell* enqueue_pos;
+ };
struct ConsumerPosition {
Cell* dequeue_chunk_pos;
Cell* dequeue_chunk_poll_pos;
const int buffer_size_;
const int producer_consumer_distance_;
Cell* buffer_;
- // Store producer and consumer data in TLS to avoid modifying the
- // same CPU cache line from two threads simultaneously.
- Thread::LocalStorageKey consumer_key_;
- Thread::LocalStorageKey producer_key_;
+ byte* positions_;
+ ProducerPosition* producer_pos_;
+ ConsumerPosition* consumer_pos_;
};
void ProfilerEventsProcessor::Run() {
- ticks_buffer_.SetUpConsumer();
unsigned dequeue_order = 0;
running_ = true;
ticks_buffer_.FlushResidualRecords();
// Perform processing until we have tick events, skip remaining code events.
while (ProcessTicks(dequeue_order) && ProcessCodeEvent(&dequeue_order)) { }
- ticks_buffer_.TearDownConsumer();
}
void FunctionMoveEvent(Address from, Address to);
void FunctionDeleteEvent(Address from);
- // Tick sampler registration. Called by sampler thread or signal handler.
- inline void SetUpSamplesProducer() { ticks_buffer_.SetUpProducer(); }
// Tick sample events are filled directly in the buffer of the circular
// queue (because the structure is of fixed width, but usually not all
// stack frame entries are filled.) This method returns a pointer to the
// next record of the buffer.
INLINE(TickSample* TickSampleEvent());
- inline void TearDownSamplesProducer() { ticks_buffer_.TearDownProducer(); }
private:
union CodeEventsContainer {
// gives 8K bytes per page.
const int kPageSizeBits = 13;
+// On Intel architecture, cache line size is 64 bytes.
+// On ARM it may be less (32 bytes), but as far this constant is
+// used for aligning data, it doesn't hurt to align on a greater value.
+const int kProcessorCacheLineSize = 64;
// Constants relevant to double precision floating point numbers.
SamplingCircularQueue scq(sizeof(Record),
kRecordsPerChunk * sizeof(Record),
3);
- scq.SetUpProducer();
- scq.SetUpConsumer();
// Check that we are using non-reserved values.
CHECK_NE(SamplingCircularQueue::kClear, 1);
// Consumption must still be possible as the first cell of the
// last chunk is not clean.
CHECK_NE(NULL, scq.StartDequeue());
+}
+
+
+namespace {
+
+class ProducerThread: public i::Thread {
+ public:
+ typedef SamplingCircularQueue::Cell Record;
+
+ ProducerThread(SamplingCircularQueue* scq,
+ int records_per_chunk,
+ Record value,
+ i::Semaphore* finished)
+ : scq_(scq),
+ records_per_chunk_(records_per_chunk),
+ value_(value),
+ finished_(finished) { }
+
+ virtual void Run() {
+ for (Record i = value_; i < value_ + records_per_chunk_; ++i) {
+ Record* rec = reinterpret_cast<Record*>(scq_->Enqueue());
+ CHECK_NE(NULL, rec);
+ *rec = i;
+ }
+
+ finished_->Signal();
+ }
+
+ private:
+ SamplingCircularQueue* scq_;
+ const int records_per_chunk_;
+ Record value_;
+ i::Semaphore* finished_;
+};
+
+} // namespace
+
+TEST(SamplingCircularQueueMultithreading) {
+ // Emulate multiple VM threads working 'one thread at a time.'
+ // This test enqueues data from different threads. This corresponds
+ // to the case of profiling under Linux, where signal handler that
+ // does sampling is called in the context of different VM threads.
+
+ typedef ProducerThread::Record Record;
+ const int kRecordsPerChunk = 4;
+ SamplingCircularQueue scq(sizeof(Record),
+ kRecordsPerChunk * sizeof(Record),
+ 3);
+ i::Semaphore* semaphore = i::OS::CreateSemaphore(0);
+ // Don't poll ahead, making possible to check data in the buffer
+ // immediately after enqueuing.
+ scq.FlushResidualRecords();
+
+ // Check that we are using non-reserved values.
+ CHECK_NE(SamplingCircularQueue::kClear, 1);
+ CHECK_NE(SamplingCircularQueue::kEnd, 1);
+ ProducerThread producer1(&scq, kRecordsPerChunk, 1, semaphore);
+ ProducerThread producer2(&scq, kRecordsPerChunk, 10, semaphore);
+ ProducerThread producer3(&scq, kRecordsPerChunk, 20, semaphore);
+
+ CHECK_EQ(NULL, scq.StartDequeue());
+ producer1.Start();
+ semaphore->Wait();
+ for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) {
+ Record* rec = reinterpret_cast<Record*>(scq.StartDequeue());
+ CHECK_NE(NULL, rec);
+ CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec));
+ CHECK_EQ(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
+ scq.FinishDequeue();
+ CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
+ }
+
+ CHECK_EQ(NULL, scq.StartDequeue());
+ producer2.Start();
+ semaphore->Wait();
+ for (Record i = 10; i < 10 + kRecordsPerChunk; ++i) {
+ Record* rec = reinterpret_cast<Record*>(scq.StartDequeue());
+ CHECK_NE(NULL, rec);
+ CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec));
+ CHECK_EQ(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
+ scq.FinishDequeue();
+ CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
+ }
+
+ CHECK_EQ(NULL, scq.StartDequeue());
+ producer3.Start();
+ semaphore->Wait();
+ for (Record i = 20; i < 20 + kRecordsPerChunk; ++i) {
+ Record* rec = reinterpret_cast<Record*>(scq.StartDequeue());
+ CHECK_NE(NULL, rec);
+ CHECK_EQ(static_cast<int64_t>(i), static_cast<int64_t>(*rec));
+ CHECK_EQ(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
+ scq.FinishDequeue();
+ CHECK_NE(rec, reinterpret_cast<Record*>(scq.StartDequeue()));
+ }
+
+ CHECK_EQ(NULL, scq.StartDequeue());
- scq.TearDownConsumer();
- scq.TearDownProducer();
+ delete semaphore;
}
ProfileGenerator generator(&profiles);
ProfilerEventsProcessor processor(&generator);
processor.Start();
- processor.SetUpSamplesProducer();
while (!processor.running()) {
i::Thread::YieldCPU();
}
CodeEntry* entry5 = generator.code_map()->FindEntry(ToAddress(0x1700));
CHECK_NE(NULL, entry5);
CHECK_EQ(aaa_str, entry5->name());
-
- processor.TearDownSamplesProducer();
}
ProfileGenerator generator(&profiles);
ProfilerEventsProcessor processor(&generator);
processor.Start();
- processor.SetUpSamplesProducer();
while (!processor.running()) {
i::Thread::YieldCPU();
}
bottom_up_ddd_children.last()->GetChildren(&bottom_up_ddd_stub_children);
CHECK_EQ(1, bottom_up_ddd_stub_children.length());
CHECK_EQ("bbb", bottom_up_ddd_stub_children.last()->entry()->name());
-
- processor.TearDownSamplesProducer();
}