void* SamplingCircularQueue::Enqueue() {
- WrapPositionIfNeeded(&producer_pos_->enqueue_pos);
+ if (producer_pos_->enqueue_pos == producer_pos_->next_chunk_pos) {
+ if (producer_pos_->enqueue_pos == buffer_ + buffer_size_) {
+ producer_pos_->next_chunk_pos = buffer_;
+ producer_pos_->enqueue_pos = buffer_;
+ }
+ Acquire_Store(producer_pos_->next_chunk_pos, kEnqueueStarted);
+ // Skip marker.
+ producer_pos_->enqueue_pos += 1;
+ producer_pos_->next_chunk_pos += chunk_size_;
+ }
void* result = producer_pos_->enqueue_pos;
producer_pos_->enqueue_pos += record_size_;
return result;
void SamplingCircularQueue::WrapPositionIfNeeded(
SamplingCircularQueue::Cell** pos) {
- if (**pos == kEnd) *pos = buffer_;
+ if (*pos == buffer_ + buffer_size_) *pos = buffer_;
}
namespace internal {
-SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes,
- int desired_chunk_size_in_bytes,
+SamplingCircularQueue::SamplingCircularQueue(size_t record_size_in_bytes,
+ size_t desired_chunk_size_in_bytes,
int buffer_size_in_chunks)
: record_size_(record_size_in_bytes / sizeof(Cell)),
chunk_size_in_bytes_(desired_chunk_size_in_bytes / record_size_in_bytes *
- record_size_in_bytes),
+ record_size_in_bytes + sizeof(Cell)),
chunk_size_(chunk_size_in_bytes_ / sizeof(Cell)),
buffer_size_(chunk_size_ * buffer_size_in_chunks),
- // The distance ensures that producer and consumer never step on
- // each other's chunks and helps eviction of produced data from
- // the CPU cache (having that chunk size is bigger than the cache.)
- producer_consumer_distance_(2 * chunk_size_),
- buffer_(NewArray<Cell>(buffer_size_ + 1)) {
+ buffer_(NewArray<Cell>(buffer_size_)) {
+ ASSERT(record_size_ * sizeof(Cell) == record_size_in_bytes);
+ ASSERT(chunk_size_ * sizeof(Cell) == chunk_size_in_bytes_);
ASSERT(buffer_size_in_chunks > 2);
- // Clean up the whole buffer to avoid encountering a random kEnd
- // while enqueuing.
- for (int i = 0; i < buffer_size_; ++i) {
+ // Mark all chunks as clear.
+ for (int i = 0; i < buffer_size_; i += chunk_size_) {
buffer_[i] = kClear;
}
- buffer_[buffer_size_] = kEnd;
// Layout producer and consumer position pointers each on their own
// cache lines to avoid cache lines thrashing due to simultaneous
producer_pos_ = reinterpret_cast<ProducerPosition*>(
RoundUp(positions_, kProcessorCacheLineSize));
+ producer_pos_->next_chunk_pos = buffer_;
producer_pos_->enqueue_pos = buffer_;
consumer_pos_ = reinterpret_cast<ConsumerPosition*>(
ASSERT(reinterpret_cast<byte*>(consumer_pos_ + 1) <=
positions_ + positions_size);
consumer_pos_->dequeue_chunk_pos = buffer_;
- consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
+ // The distance ensures that producer and consumer never step on
+ // each other's chunks and helps eviction of produced data from
+ // the CPU cache (having that chunk size is bigger than the cache.)
+ const int producer_consumer_distance = (2 * chunk_size_);
+ consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance;
consumer_pos_->dequeue_pos = NULL;
}
if (consumer_pos_->dequeue_pos != NULL) {
return consumer_pos_->dequeue_pos;
} else {
- if (*consumer_pos_->dequeue_chunk_poll_pos != kClear) {
- consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos;
- consumer_pos_->dequeue_end_pos = consumer_pos_->dequeue_pos + chunk_size_;
+ if (Acquire_Load(consumer_pos_->dequeue_chunk_poll_pos) != kClear) {
+ // Skip marker.
+ consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos + 1;
+ consumer_pos_->dequeue_end_pos =
+ consumer_pos_->dequeue_chunk_pos + chunk_size_;
return consumer_pos_->dequeue_pos;
} else {
return NULL;
class SamplingCircularQueue {
public:
// Executed on the application thread.
- SamplingCircularQueue(int record_size_in_bytes,
- int desired_chunk_size_in_bytes,
+ SamplingCircularQueue(size_t record_size_in_bytes,
+ size_t desired_chunk_size_in_bytes,
int buffer_size_in_chunks);
~SamplingCircularQueue();
void FlushResidualRecords();
typedef AtomicWord Cell;
- // Reserved values for the first cell of a record.
- static const Cell kClear = 0; // Marks clean (processed) chunks.
- static const Cell kEnd = -1; // Marks the end of the buffer.
private:
+ // Reserved values for the chunk marker (first Cell in each chunk).
+ enum {
+ kClear, // Marks clean (processed) chunks.
+ kEnqueueStarted // Marks chunks where enqueue started.
+ };
+
struct ProducerPosition {
+ Cell* next_chunk_pos;
Cell* enqueue_pos;
};
struct ConsumerPosition {
INLINE(void WrapPositionIfNeeded(Cell** pos));
const int record_size_;
- const int chunk_size_in_bytes_;
+ const size_t chunk_size_in_bytes_;
const int chunk_size_;
const int buffer_size_;
- const int producer_consumer_distance_;
Cell* buffer_;
byte* positions_;
ProducerPosition* producer_pos_;
// The parameterless constructor is used when we dequeue data from
// the ticks buffer.
TickSampleEventRecord() { }
- explicit TickSampleEventRecord(unsigned order)
- : filler(1),
- order(order) {
- ASSERT(filler != SamplingCircularQueue::kClear);
- }
+ explicit TickSampleEventRecord(unsigned order) : order(order) { }
- // The first machine word of a TickSampleEventRecord must not ever
- // become equal to SamplingCircularQueue::kClear. As both order and
- // TickSample's first field are not reliable in this sense (order
- // can overflow, TickSample can have all fields reset), we are
- // forced to use an artificial filler field.
- int filler;
unsigned order;
TickSample sample;
3);
// Check that we are using non-reserved values.
- CHECK_NE(SamplingCircularQueue::kClear, 1);
- CHECK_NE(SamplingCircularQueue::kEnd, 1);
// Fill up the first chunk.
CHECK_EQ(NULL, scq.StartDequeue());
for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) {
scq.FlushResidualRecords();
// Check that we are using non-reserved values.
- CHECK_NE(SamplingCircularQueue::kClear, 1);
- CHECK_NE(SamplingCircularQueue::kEnd, 1);
ProducerThread producer1(&scq, kRecordsPerChunk, 1, semaphore);
ProducerThread producer2(&scq, kRecordsPerChunk, 10, semaphore);
ProducerThread producer3(&scq, kRecordsPerChunk, 20, semaphore);