[libc] Enable multiple threads to use RPC on the GPU
authorJoseph Huber <jhuber6@vols.utk.edu>
Thu, 4 May 2023 19:53:28 +0000 (14:53 -0500)
committerJoseph Huber <jhuber6@vols.utk.edu>
Fri, 5 May 2023 00:31:41 +0000 (19:31 -0500)
The execution model of the GPU expects that groups of threads will
execute in lock-step in SIMD fashion. It's both important for
performance and correctness that we treat this as the smallest possible
granularity for an RPC operation. Thus, we map multiple threads to a
single larger buffer and ship that across the wire.

This patch makes the necessary changes to support executing the RPC on
the GPU with multiple threads. This requires some workarounds to mimic
the model when handling the protocol from the CPU. I'm not completely
happy with some of the workarounds required, but I think it should work.

Uses some of the implementation details from D148191.

Reviewed By: JonChesterfield

Differential Revision: https://reviews.llvm.org/D148943

libc/src/__support/RPC/CMakeLists.txt
libc/src/__support/RPC/rpc.h
libc/src/__support/RPC/rpc_util.h
libc/startup/gpu/amdgpu/start.cpp
libc/startup/gpu/nvptx/start.cpp
libc/test/integration/startup/gpu/CMakeLists.txt
libc/test/integration/startup/gpu/rpc_test.cpp
libc/utils/gpu/loader/Loader.h
libc/utils/gpu/loader/Server.h
libc/utils/gpu/loader/amdgpu/Loader.cpp
libc/utils/gpu/loader/nvptx/Loader.cpp

index 1ac2a3b..96bfd8e 100644 (file)
@@ -10,6 +10,8 @@ add_header_library(
   DEPENDS
     libc.src.__support.common
     libc.src.__support.CPP.atomic
+    libc.src.__support.CPP.optional
+    libc.src.__support.CPP.functional
     libc.src.__support.GPU.utils
 )
 
index fc7a66f..170f748 100644 (file)
@@ -20,6 +20,7 @@
 
 #include "rpc_util.h"
 #include "src/__support/CPP/atomic.h"
+#include "src/__support/CPP/functional.h"
 #include "src/__support/CPP/optional.h"
 #include "src/__support/GPU/utils.h"
 #include "src/string/memory_utils/memcpy_implementations.h"
@@ -38,12 +39,36 @@ enum Opcode : uint16_t {
 };
 
 /// A fixed size channel used to communicate between the RPC client and server.
-struct alignas(64) Buffer {
-  uint8_t data[62];
-  uint16_t opcode;
+struct Buffer {
+  uint64_t data[8];
 };
 static_assert(sizeof(Buffer) == 64, "Buffer size mismatch");
 
+/// The information associated with a packet. This indicates which operations to
+/// perform and which threads are active in the slots.
+struct Header {
+  uint64_t mask;
+  uint16_t opcode;
+};
+
+/// The data payload for the associated packet. We provide enough space for each
+/// thread in the cooperating lane to have a buffer.
+struct Payload {
+#if defined(LIBC_TARGET_ARCH_IS_GPU)
+  Buffer slot[gpu::LANE_SIZE];
+#else
+  // Flexible array size allocated at runtime to the appropriate size.
+  Buffer slot[];
+#endif
+};
+
+/// A packet used to share data between the client and server across an entire
+/// lane. We use a lane as the minimum granularity for execution.
+struct alignas(64) Packet {
+  Header header;
+  Payload payload;
+};
+
 /// A common process used to synchronize communication between a client and a
 /// server. The process contains an inbox and an outbox used for signaling
 /// ownership of the shared buffer between both sides.
@@ -71,18 +96,21 @@ template <bool InvertInbox> struct Process {
   LIBC_INLINE Process &operator=(const Process &) = default;
   LIBC_INLINE ~Process() = default;
 
+  uint32_t lane_size;
   cpp::Atomic<uint32_t> *lock;
   cpp::Atomic<uint32_t> *inbox;
   cpp::Atomic<uint32_t> *outbox;
-  Buffer *buffer;
+  Packet *buffer;
 
   /// Initialize the communication channels.
-  LIBC_INLINE void reset(void *lock, void *inbox, void *outbox, void *buffer) {
+  LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox,
+                         void *outbox, void *buffer) {
     *this = {
+        lane_size,
         reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
         reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
         reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
-        reinterpret_cast<Buffer *>(buffer),
+        reinterpret_cast<Packet *>(buffer),
     };
   }
 
@@ -144,7 +172,8 @@ template <bool InvertInbox> struct Process {
     return lane_mask != packed;
   }
 
-  // Unlock the lock at index.
+  /// Unlock the lock at index. We need a lane sync to keep this function
+  /// convergent, otherwise the compiler will sink the store and deadlock.
   [[clang::convergent]] LIBC_INLINE void unlock(uint64_t lane_mask,
                                                 uint64_t index) {
     // Wait for other threads in the warp to finish using the lock
@@ -156,6 +185,31 @@ template <bool InvertInbox> struct Process {
     // warp dropping the lock again.
     uint32_t and_mask = ~(rpc::is_first_lane(lane_mask) ? 1 : 0);
     lock[index].fetch_and(and_mask, cpp::MemoryOrder::RELAXED);
+    gpu::sync_lane(lane_mask);
+  }
+
+  /// Invokes a function accross every active buffer across the total lane size.
+  LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
+                              uint32_t index) {
+    if constexpr (is_process_gpu()) {
+      fn(&buffer[index].payload.slot[gpu::get_lane_id()]);
+    } else {
+      for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
+        if (buffer[index].header.mask & 1ul << i)
+          fn(&buffer[index].payload.slot[i]);
+    }
+  }
+
+  /// Alternate version that also provides the index of the current lane.
+  LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
+                              uint32_t index) {
+    if constexpr (is_process_gpu()) {
+      fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
+    } else {
+      for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
+        if (buffer[index].header.mask & 1ul << i)
+          fn(&buffer[index].payload.slot[i], i);
+    }
   }
 };
 
@@ -180,7 +234,7 @@ template <bool T> struct Port {
   template <typename A> LIBC_INLINE void recv_n(A alloc);
 
   LIBC_INLINE uint16_t get_opcode() const {
-    return process.buffer[index].opcode;
+    return process.buffer[index].header.opcode;
   }
 
   LIBC_INLINE void close() { process.unlock(lane_mask, index); }
@@ -227,7 +281,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
   }
 
   // Apply the \p fill function to initialize the buffer and release the memory.
-  fill(&process.buffer[index]);
+  process.invoke_rpc(fill, index);
   out = !out;
   atomic_thread_fence(cpp::MemoryOrder::RELEASE);
   process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
@@ -245,7 +299,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
   atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
 
   // Apply the \p use function to read the memory out of the buffer.
-  use(&process.buffer[index]);
+  process.invoke_rpc(use, index);
   out = !out;
   process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
 }
@@ -274,7 +328,10 @@ template <bool T>
 LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
   // TODO: We could send the first bytes in this call and potentially save an
   // extra send operation.
-  send([=](Buffer *buffer) { buffer->data[0] = size; });
+  // TODO: We may need a way for the CPU to send different strings per thread.
+  send([=](Buffer *buffer) {
+    reinterpret_cast<uint64_t *>(buffer->data)[0] = size;
+  });
   const uint8_t *ptr = reinterpret_cast<const uint8_t *>(src);
   for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
     send([=](Buffer *buffer) {
@@ -283,6 +340,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
       inline_memcpy(buffer->data, ptr + idx, len);
     });
   }
+  gpu::sync_lane(process.buffer[index].header.mask);
 }
 
 /// Receives an arbitrarily sized data buffer across the shared channel in
@@ -291,15 +349,42 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
 template <bool T>
 template <typename A>
 LIBC_INLINE void Port<T>::recv_n(A alloc) {
-  uint64_t size = 0;
-  recv([&](Buffer *buffer) { size = buffer->data[0]; });
-  uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size));
-  for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
-    recv([=](Buffer *buffer) {
-      uint64_t len =
-          size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx;
-      inline_memcpy(dst + idx, buffer->data, len);
+  // The GPU handles thread private variables and masking implicitly through its
+  // execution model. If this is the CPU we need to manually handle the
+  // possibility that the sent data is of different length.
+  if constexpr (is_process_gpu()) {
+    uint64_t size = 0;
+    recv([&](Buffer *buffer) {
+      size = reinterpret_cast<uint64_t *>(buffer->data)[0];
+    });
+    uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size), gpu::get_lane_id());
+    for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
+      recv([=](Buffer *buffer) {
+        uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data)
+                                                         : size - idx;
+        inline_memcpy(dst + idx, buffer->data, len);
+      });
+    }
+    return;
+  } else {
+    uint64_t size[MAX_LANE_SIZE];
+    uint8_t *dst[MAX_LANE_SIZE];
+    uint64_t max = 0;
+    recv([&](Buffer *buffer, uint32_t id) {
+      size[id] = reinterpret_cast<uint64_t *>(buffer->data)[0];
+      dst[id] = reinterpret_cast<uint8_t *>(alloc(size[id], id));
+      max = size[id] > max ? size[id] : max;
     });
+    for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) {
+      recv([=](Buffer *buffer, uint32_t id) {
+        uint64_t len = size[id] - idx > sizeof(Buffer::data)
+                           ? sizeof(Buffer::data)
+                           : size[id] - idx;
+        if (idx < size[id])
+          inline_memcpy(dst[id] + idx, buffer->data, len);
+      });
+    }
+    return;
   }
 }
 
@@ -307,7 +392,10 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) {
 /// port if we find an index that is in a valid sending state. That is, there
 /// are send operations pending that haven't been serviced on this port. Each
 /// port instance uses an associated \p opcode to tell the server what to do.
-LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) {
+/// Opening a port is only valid if the `opcode` is the sam accross every
+/// participating thread.
+[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
+Client::try_open(uint16_t opcode) {
   constexpr uint64_t index = 0;
   const uint64_t lane_mask = gpu::get_lane_mask();
 
@@ -323,13 +411,16 @@ LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) {
 
   // Once we acquire the index we need to check if we are in a valid sending
   // state.
-
   if (buffer_unavailable(in, out)) {
     unlock(lane_mask, index);
     return cpp::nullopt;
   }
 
-  buffer->opcode = opcode;
+  if (is_first_lane(lane_mask)) {
+    buffer[index].header.opcode = opcode;
+    buffer[index].header.mask = lane_mask;
+  }
+  gpu::sync_lane(lane_mask);
   return Port(*this, lane_mask, index, out);
 }
 
@@ -343,7 +434,8 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
 
 /// Attempts to open a port to use as the server. The server can only open a
 /// port if it has a pending receive operation
-LIBC_INLINE cpp::optional<Server::Port> Server::try_open() {
+[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port>
+Server::try_open() {
   constexpr uint64_t index = 0;
   const uint64_t lane_mask = gpu::get_lane_mask();
 
index 349a531..224723e 100644 (file)
@@ -16,6 +16,9 @@
 namespace __llvm_libc {
 namespace rpc {
 
+/// Maximum amount of data a single lane can use.
+constexpr uint64_t MAX_LANE_SIZE = 64;
+
 /// Suspend the thread briefly to assist the thread scheduler during busy loops.
 LIBC_INLINE void sleep_briefly() {
 #if defined(LIBC_TARGET_ARCH_IS_NVPTX) && __CUDA_ARCH__ >= 700
@@ -37,6 +40,15 @@ LIBC_INLINE bool is_first_lane(uint64_t lane_mask) {
   return gpu::get_lane_id() == get_first_lane_id(lane_mask);
 }
 
+/// Conditional to indicate if this process is running on the GPU.
+LIBC_INLINE constexpr bool is_process_gpu() {
+#if defined(LIBC_TARGET_ARCH_IS_GPU)
+  return true;
+#else
+  return false;
+#endif
+}
+
 } // namespace rpc
 } // namespace __llvm_libc
 
index ab83ea5..b28ad79 100644 (file)
@@ -52,7 +52,7 @@ void initialize(int argc, char **argv, char **env, void *in, void *out,
   if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) {
     // We need to set up the RPC client first in case any of the constructors
     // require it.
-    rpc::client.reset(&lock, in, out, buffer);
+    rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer);
 
     // We want the fini array callbacks to be run after other atexit
     // callbacks are run. So, we register them before running the init
index fe09666..9ed7559 100644 (file)
@@ -57,7 +57,7 @@ void initialize(int argc, char **argv, char **env, void *in, void *out,
   if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) {
     // We need to set up the RPC client first in case any of the constructors
     // require it.
-    rpc::client.reset(&lock, in, out, buffer);
+    rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer);
 
     // We want the fini array callbacks to be run after other atexit
     // callbacks are run. So, we register them before running the init
index 754f36d..d2028cc 100644 (file)
@@ -22,8 +22,12 @@ add_integration_test(
     libc.src.__support.RPC.rpc_client
     libc.src.__support.GPU.utils
   LOADER_ARGS
-    --blocks 16
-    --threads 1
+    --blocks-x 2
+    --blocks-y 2
+    --blocks-z 2
+    --threads-x 4
+    --threads-y 4
+    --threads-z 4
 )
 
 add_integration_test(
index daf7bf7..9dc2214 100644 (file)
@@ -13,7 +13,8 @@
 using namespace __llvm_libc;
 
 static void test_add_simple() {
-  uint32_t num_additions = 1000 + 10 * gpu::get_block_id_x();
+  uint32_t num_additions =
+      10 + 10 * gpu::get_thread_id() + 10 * gpu::get_block_id();
   uint64_t cnt = 0;
   for (uint32_t i = 0; i < num_additions; ++i) {
     rpc::Client::Port port = rpc::client.open(rpc::TEST_INCREMENT);
@@ -29,8 +30,20 @@ static void test_add_simple() {
   ASSERT_TRUE(cnt == num_additions && "Incorrect sum");
 }
 
+// Test to ensure that the RPC mechanism doesn't hang on divergence.
+static void test_noop(uint8_t data) {
+  rpc::Client::Port port = rpc::client.open(rpc::NOOP);
+  port.send([=](rpc::Buffer *buffer) { buffer->data[0] = data; });
+  port.close();
+}
+
 TEST_MAIN(int argc, char **argv, char **envp) {
   test_add_simple();
 
+  if (gpu::get_thread_id() % 2)
+    test_noop(1);
+  else
+    test_noop(2);
+
   return 0;
 }
index 9c6413e..feaa8e0 100644 (file)
@@ -29,6 +29,11 @@ struct LaunchParameters {
 int load(int argc, char **argv, char **evnp, void *image, size_t size,
          const LaunchParameters &params);
 
+/// Return \p V aligned "upwards" according to \p Align.
+template <typename V, typename A> inline V align_up(V val, A align) {
+  return ((val + V(align) - 1) / V(align)) * V(align);
+}
+
 /// Copy the system's argument vector to GPU memory allocated using \p alloc.
 template <typename Allocator>
 void *copy_argument_vector(int argc, char **argv, Allocator alloc) {
index cd04335..6ffb329 100644 (file)
@@ -30,15 +30,19 @@ void handle_server() {
 
   switch (port->get_opcode()) {
   case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
-    uint64_t str_size;
-    char *str = nullptr;
-    port->recv_n([&](uint64_t size) {
-      str_size = size;
-      str = new char[size];
-      return str;
+    uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0};
+    char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr};
+    port->recv_n([&](uint64_t size, uint32_t id) {
+      str_size[id] = size;
+      strs[id] = new char[size];
+      return strs[id];
     });
-    fwrite(str, str_size, 1, stderr);
-    delete[] str;
+    for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) {
+      if (strs[i]) {
+        fwrite(strs[i], str_size[i], 1, stderr);
+        delete[] strs[i];
+      }
+    }
     break;
   }
   case __llvm_libc::rpc::Opcode::EXIT: {
@@ -54,8 +58,7 @@ void handle_server() {
     break;
   }
   default:
-    port->recv([](__llvm_libc::rpc::Buffer *) { /* no-op */ });
-    return;
+    port->recv([](__llvm_libc::rpc::Buffer *buffer) {});
   }
   port->close();
 }
index af5f008..f9a7b75 100644 (file)
@@ -287,6 +287,10 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
   hsa_amd_memory_fill(dev_ret, 0, sizeof(int));
 
   // Allocate finegrained memory for the RPC server and client to share.
+  uint32_t wavefront_size = 0;
+  if (hsa_status_t err = hsa_agent_get_info(
+          dev_agent, HSA_AGENT_INFO_WAVEFRONT_SIZE, &wavefront_size))
+    handle_error(err);
   void *server_inbox;
   void *server_outbox;
   void *buffer;
@@ -299,7 +303,10 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
           /*flags=*/0, &server_outbox))
     handle_error(err);
   if (hsa_status_t err = hsa_amd_memory_pool_allocate(
-          finegrained_pool, sizeof(__llvm_libc::rpc::Buffer),
+          finegrained_pool,
+          align_up(sizeof(__llvm_libc::rpc::Header) +
+                       (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)),
+                   alignof(__llvm_libc::rpc::Packet)),
           /*flags=*/0, &buffer))
     handle_error(err);
   hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_inbox);
@@ -351,7 +358,7 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
     handle_error(err);
 
   // Initialize the RPC server's buffer for host-device communication.
-  server.reset(&lock, server_inbox, server_outbox, buffer);
+  server.reset(wavefront_size, &lock, server_inbox, server_outbox, buffer);
 
   // Initialize the packet header and set the doorbell signal to begin execution
   // by the HSA runtime.
index baf8baa..77e6967 100644 (file)
@@ -232,9 +232,13 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
   if (CUresult err = cuMemsetD32(dev_ret, 0, 1))
     handle_error(err);
 
+  uint32_t warp_size = 32;
   void *server_inbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>));
   void *server_outbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>));
-  void *buffer = allocator(sizeof(__llvm_libc::rpc::Buffer));
+  void *buffer =
+      allocator(align_up(sizeof(__llvm_libc::rpc::Header) +
+                             (warp_size * sizeof(__llvm_libc::rpc::Buffer)),
+                         alignof(__llvm_libc::rpc::Packet)));
   if (!server_inbox || !server_outbox || !buffer)
     handle_error("Failed to allocate memory the RPC client / server.");
 
@@ -254,7 +258,7 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
                          CU_LAUNCH_PARAM_END};
 
   // Initialize the RPC server's buffer for host-device communication.
-  server.reset(&lock, server_inbox, server_outbox, buffer);
+  server.reset(warp_size, &lock, server_inbox, server_outbox, buffer);
 
   // Call the kernel with the given arguments.
   if (CUresult err = cuLaunchKernel(