[XRay] Move buffer extents back to the heap
authorDean Michael Berris <dberris@google.com>
Tue, 20 Nov 2018 01:00:26 +0000 (01:00 +0000)
committerDean Michael Berris <dberris@google.com>
Tue, 20 Nov 2018 01:00:26 +0000 (01:00 +0000)
Summary:
This change addresses an issue which shows up with the synchronised race
between threads writing into a buffer, and another thread reading the
buffer.

In a lot of cases, we cannot guarantee that threads will always see the
signal to finalise their buffers in time despite the grace periods and
state machine maintained through atomic variables. This change addresses
it by ensuring that the same instance being updated to indicate how much
of the buffer is "used" by the writing thread is the same instance being
read by the thread processing the buffer to be written out to disk or
handled through the iterators.

To do this, we ensure that all the "extents" instances live in their own
the backing store, in a different contiguous page from the
buffer-specific backing store. We also take precautions to ensure that
the atomic variables are cache-line-sized to prevent false-sharing from
unnecessarily causing cache contention on unrelated writes/reads.

It's feasible that we may in the future be able to move the storage of
the extents objects into the single backing store, slightly changing the
way to compute the size(s) of the buffers, but in the meantime we'll
settle for the isolation afforded by having a different backing store
for the extents instances.

Reviewers: mboerger

Subscribers: jfb, llvm-commits

Differential Revision: https://reviews.llvm.org/D54684

llvm-svn: 347280

compiler-rt/lib/xray/tests/unit/test_helpers.cc
compiler-rt/lib/xray/xray_buffer_queue.cc
compiler-rt/lib/xray/xray_buffer_queue.h
compiler-rt/lib/xray/xray_fdr_controller.h
compiler-rt/lib/xray/xray_fdr_log_writer.h
compiler-rt/lib/xray/xray_fdr_logging.cc

index 771a96e..284492d 100644 (file)
@@ -82,7 +82,7 @@ std::string serialize(BufferQueue &Buffers, int32_t Version) {
   Serialized.append(reinterpret_cast<const char *>(&HeaderStorage),
                     sizeof(XRayFileHeader));
   Buffers.apply([&](const BufferQueue::Buffer &B) {
-    auto Size = atomic_load_relaxed(&B.Extents);
+    auto Size = atomic_load_relaxed(B.Extents);
     auto Extents =
         createMetadataRecord<MetadataRecord::RecordKinds::BufferExtents>(Size);
     Serialized.append(reinterpret_cast<const char *>(&Extents),
index 3cfe0bc..9972beb 100644 (file)
@@ -23,7 +23,6 @@
 #include <sys/mman.h>
 
 using namespace __xray;
-using namespace __sanitizer;
 
 namespace {
 
@@ -53,6 +52,18 @@ void incRefCount(BufferQueue::ControlBlock *C) {
   atomic_fetch_add(&C->RefCount, 1, memory_order_acq_rel);
 }
 
+// We use a struct to ensure that we are allocating one atomic_uint64_t per
+// cache line. This allows us to not worry about false-sharing among atomic
+// objects being updated (constantly) by different threads.
+struct ExtentsPadded {
+  union {
+    atomic_uint64_t Extents;
+    unsigned char Storage[kCacheLineSize];
+  };
+};
+
+constexpr size_t kExtentsSize = sizeof(ExtentsPadded);
+
 } // namespace
 
 BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
@@ -71,13 +82,25 @@ BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
   if (BackingStore == nullptr)
     return BufferQueue::ErrorCode::NotEnoughMemory;
 
-  auto CleanupBackingStore = __sanitizer::at_scope_exit([&, this] {
+  auto CleanupBackingStore = at_scope_exit([&, this] {
     if (Success)
       return;
     deallocControlBlock(BackingStore, BufferSize, BufferCount);
     BackingStore = nullptr;
   });
 
+  // Initialize enough atomic_uint64_t instances, each
+  ExtentsBackingStore = allocControlBlock(kExtentsSize, BufferCount);
+  if (ExtentsBackingStore == nullptr)
+    return BufferQueue::ErrorCode::NotEnoughMemory;
+
+  auto CleanupExtentsBackingStore = at_scope_exit([&, this] {
+    if (Success)
+      return;
+    deallocControlBlock(ExtentsBackingStore, kExtentsSize, BufferCount);
+    ExtentsBackingStore = nullptr;
+  });
+
   Buffers = initArray<BufferRep>(BufferCount);
   if (Buffers == nullptr)
     return BufferQueue::ErrorCode::NotEnoughMemory;
@@ -89,6 +112,7 @@ BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
   // First, we initialize the refcount in the ControlBlock, which we treat as
   // being at the start of the BackingStore pointer.
   atomic_store(&BackingStore->RefCount, 1, memory_order_release);
+  atomic_store(&ExtentsBackingStore->RefCount, 1, memory_order_release);
 
   // Then we initialise the individual buffers that sub-divide the whole backing
   // store. Each buffer will start at the `Data` member of the ControlBlock, and
@@ -96,11 +120,15 @@ BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
   for (size_t i = 0; i < BufferCount; ++i) {
     auto &T = Buffers[i];
     auto &Buf = T.Buff;
-    atomic_store(&Buf.Extents, 0, memory_order_release);
+    auto *E = reinterpret_cast<ExtentsPadded *>(&ExtentsBackingStore->Data +
+                                                (kExtentsSize * i));
+    Buf.Extents = &E->Extents;
+    atomic_store(Buf.Extents, 0, memory_order_release);
     Buf.Generation = generation();
     Buf.Data = &BackingStore->Data + (BufferSize * i);
     Buf.Size = BufferSize;
     Buf.BackingStore = BackingStore;
+    Buf.ExtentsBackingStore = ExtentsBackingStore;
     Buf.Count = BufferCount;
     T.Used = false;
   }
@@ -120,6 +148,7 @@ BufferQueue::BufferQueue(size_t B, size_t N,
       Mutex(),
       Finalizing{1},
       BackingStore(nullptr),
+      ExtentsBackingStore(nullptr),
       Buffers(nullptr),
       Next(Buffers),
       First(Buffers),
@@ -144,6 +173,7 @@ BufferQueue::ErrorCode BufferQueue::getBuffer(Buffer &Buf) {
   }
 
   incRefCount(BackingStore);
+  incRefCount(ExtentsBackingStore);
   Buf = B->Buff;
   Buf.Generation = generation();
   B->Used = true;
@@ -159,6 +189,7 @@ BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) {
     if (Buf.Generation != generation() || LiveBuffers == 0) {
       Buf = {};
       decRefCount(Buf.BackingStore, Buf.Size, Buf.Count);
+      decRefCount(Buf.ExtentsBackingStore, kExtentsSize, Buf.Count);
       return BufferQueue::ErrorCode::Ok;
     }
 
@@ -176,8 +207,8 @@ BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) {
   B->Buff = Buf;
   B->Used = true;
   decRefCount(Buf.BackingStore, Buf.Size, Buf.Count);
-  atomic_store(&B->Buff.Extents,
-               atomic_load(&Buf.Extents, memory_order_acquire),
+  decRefCount(Buf.ExtentsBackingStore, kExtentsSize, Buf.Count);
+  atomic_store(B->Buff.Extents, atomic_load(Buf.Extents, memory_order_acquire),
                memory_order_release);
   Buf = {};
   return ErrorCode::Ok;
@@ -194,7 +225,9 @@ void BufferQueue::cleanupBuffers() {
     B->~BufferRep();
   deallocateBuffer(Buffers, BufferCount);
   decRefCount(BackingStore, BufferSize, BufferCount);
+  decRefCount(ExtentsBackingStore, kExtentsSize, BufferCount);
   BackingStore = nullptr;
+  ExtentsBackingStore = nullptr;
   Buffers = nullptr;
   BufferCount = 0;
   BufferSize = 0;
index a60f7c1..ef2b433 100644 (file)
@@ -32,10 +32,11 @@ namespace __xray {
 class BufferQueue {
 public:
   /// ControlBlock represents the memory layout of how we interpret the backing
-  /// store for all buffers managed by a BufferQueue instance. The ControlBlock
-  /// has the reference count as the first member, sized according to
-  /// platform-specific cache-line size. We never use the Buffer member of the
-  /// union, which is only there for compiler-supported alignment and sizing.
+  /// store for all buffers and extents managed by a BufferQueue instance. The
+  /// ControlBlock has the reference count as the first member, sized according
+  /// to platform-specific cache-line size. We never use the Buffer member of
+  /// the union, which is only there for compiler-supported alignment and
+  /// sizing.
   ///
   /// This ensures that the `Data` member will be placed at least kCacheLineSize
   /// bytes from the beginning of the structure.
@@ -52,7 +53,7 @@ public:
   };
 
   struct Buffer {
-    atomic_uint64_t Extents{0};
+    atomic_uint64_t *Extents = nullptr;
     uint64_t Generation{0};
     void *Data = nullptr;
     size_t Size = 0;
@@ -60,6 +61,7 @@ public:
   private:
     friend class BufferQueue;
     ControlBlock *BackingStore = nullptr;
+    ControlBlock *ExtentsBackingStore = nullptr;
     size_t Count = 0;
   };
 
@@ -142,6 +144,9 @@ private:
   // The collocated ControlBlock and buffer storage.
   ControlBlock *BackingStore;
 
+  // The collocated ControlBlock and extents storage.
+  ControlBlock *ExtentsBackingStore;
+
   // A dynamically allocated array of BufferRep instances.
   BufferRep *Buffers;
 
index aa898ab..d44d030 100644 (file)
@@ -64,7 +64,7 @@ template <size_t Version = 5> class FDRController {
     First = true;
     UndoableFunctionEnters = 0;
     UndoableTailExits = 0;
-    atomic_store(&B.Extents, 0, memory_order_release);
+    atomic_store(B.Extents, 0, memory_order_release);
     return true;
   }
 
@@ -123,7 +123,7 @@ template <size_t Version = 5> class FDRController {
     if (First) {
       First = false;
       W.resetRecord();
-      atomic_store(&B.Extents, 0, memory_order_release);
+      atomic_store(B.Extents, 0, memory_order_release);
       return setupNewBuffer();
     }
 
index 9ab44fc..dbd1a98 100644 (file)
@@ -86,7 +86,7 @@ class FDRLogWriter {
     // read the bytes in the buffer will see the writes committed before the
     // extents are updated.
     atomic_thread_fence(memory_order_release);
-    atomic_fetch_add(&Buffer.Extents, sizeof(T), memory_order_acq_rel);
+    atomic_fetch_add(Buffer.Extents, sizeof(T), memory_order_acq_rel);
   }
 
 public:
@@ -116,7 +116,7 @@ public:
     // read the bytes in the buffer will see the writes committed before the
     // extents are updated.
     atomic_thread_fence(memory_order_release);
-    atomic_fetch_add(&Buffer.Extents, Size, memory_order_acq_rel);
+    atomic_fetch_add(Buffer.Extents, Size, memory_order_acq_rel);
     return Size;
   }
 
@@ -160,7 +160,7 @@ public:
     // read the bytes in the buffer will see the writes committed before the
     // extents are updated.
     atomic_thread_fence(memory_order_release);
-    atomic_fetch_add(&Buffer.Extents, sizeof(R) + sizeof(A),
+    atomic_fetch_add(Buffer.Extents, sizeof(R) + sizeof(A),
                      memory_order_acq_rel);
     return true;
   }
@@ -185,7 +185,7 @@ public:
     // read the bytes in the buffer will see the writes committed before the
     // extents are updated.
     atomic_thread_fence(memory_order_release);
-    atomic_fetch_add(&Buffer.Extents, sizeof(R) + EventSize,
+    atomic_fetch_add(Buffer.Extents, sizeof(R) + EventSize,
                      memory_order_acq_rel);
     return true;
   }
@@ -208,7 +208,7 @@ public:
     // read the bytes in the buffer will see the writes committed before the
     // extents are updated.
     atomic_thread_fence(memory_order_release);
-    atomic_fetch_add(&Buffer.Extents, EventSize, memory_order_acq_rel);
+    atomic_fetch_add(Buffer.Extents, EventSize, memory_order_acq_rel);
     return true;
   }
 
@@ -216,13 +216,13 @@ public:
 
   void resetRecord() {
     NextRecord = reinterpret_cast<char *>(Buffer.Data);
-    atomic_store(&Buffer.Extents, 0, memory_order_release);
+    atomic_store(Buffer.Extents, 0, memory_order_release);
   }
 
   void undoWrites(size_t B) {
     DCHECK_GE(NextRecord - B, reinterpret_cast<char *>(Buffer.Data));
     NextRecord -= B;
-    atomic_fetch_sub(&Buffer.Extents, B, memory_order_acq_rel);
+    atomic_fetch_sub(Buffer.Extents, B, memory_order_acq_rel);
   }
 
 }; // namespace __xray
index f1eca5c..12bd8f5 100644 (file)
@@ -250,7 +250,7 @@ XRayBuffer fdrIterator(const XRayBuffer B) {
   // fence ordering to ensure that writes we expect to have been completed
   // before the fence are fully committed before we read the extents.
   atomic_thread_fence(memory_order_acquire);
-  auto BufferSize = atomic_load(&It->Extents, memory_order_acquire);
+  auto BufferSize = atomic_load(It->Extents, memory_order_acquire);
   SerializedBufferSize = BufferSize + sizeof(MetadataRecord);
   CurrentBuffer = allocateBuffer(SerializedBufferSize);
   if (CurrentBuffer == nullptr)
@@ -364,7 +364,7 @@ XRayLogFlushStatus fdrLoggingFlush() XRAY_NEVER_INSTRUMENT {
     // still use a Metadata record, but fill in the extents instead for the
     // data.
     MetadataRecord ExtentsRecord;
-    auto BufferExtents = atomic_load(&B.Extents, memory_order_acquire);
+    auto BufferExtents = atomic_load(B.Extents, memory_order_acquire);
     DCHECK(BufferExtents <= B.Size);
     ExtentsRecord.Type = uint8_t(RecordType::Metadata);
     ExtentsRecord.RecordKind =