From 29d3da3b86cc4e8d5025602e0d7a290609f44f45 Mon Sep 17 00:00:00 2001 From: Joseph Huber Date: Fri, 19 May 2023 14:58:32 -0500 Subject: [PATCH] [libc] Fix the `send_n` and `recv_n` utilities under divergent lanes We provide the `send_n` and `recv_n` utilities as a generic way to stream data between both sides of the process. This was previously tested and performed as expected when using a string of constant size. However, when the size was allowed to diverge between the threads in the warp or wavefront this could deadlock. This did not occur on NVPTX because of the use of the explicit warp sync. However, on AMD one of the work items in the wavefront could continue executing and hit the next `recv` call before the other threads, then we would deadlock as we violated the RPC invariants. This patch replaces the for loop with a thread ballot. This will cause every thread in the warp or wavefront to continue executing the loop until all of them can exit. This acts as a more explicit wavefront sync. Reviewed By: JonChesterfield Differential Revision: https://reviews.llvm.org/D150992 --- libc/src/__support/GPU/amdgpu/utils.h | 1 + libc/src/__support/GPU/nvptx/utils.h | 1 + libc/src/__support/RPC/rpc.h | 13 ++++-- libc/test/integration/startup/gpu/CMakeLists.txt | 3 +- .../integration/startup/gpu/rpc_stream_test.cpp | 54 ++++++++++++++++++++-- 5 files changed, 61 insertions(+), 11 deletions(-) diff --git a/libc/src/__support/GPU/amdgpu/utils.h b/libc/src/__support/GPU/amdgpu/utils.h index 87cd645..665f4583 100644 --- a/libc/src/__support/GPU/amdgpu/utils.h +++ b/libc/src/__support/GPU/amdgpu/utils.h @@ -122,6 +122,7 @@ LIBC_INLINE uint32_t get_lane_size() { return LANE_SIZE; } return __builtin_amdgcn_readfirstlane(x); } +/// Returns a bitmask of threads in the current lane for which \p x is true. [[clang::convergent]] LIBC_INLINE uint64_t ballot(uint64_t lane_mask, bool x) { // the lane_mask & gives the nvptx semantics when lane_mask is a subset of // the active threads diff --git a/libc/src/__support/GPU/nvptx/utils.h b/libc/src/__support/GPU/nvptx/utils.h index 7636757..5460c3d 100644 --- a/libc/src/__support/GPU/nvptx/utils.h +++ b/libc/src/__support/GPU/nvptx/utils.h @@ -118,6 +118,7 @@ LIBC_INLINE uint32_t get_lane_size() { return LANE_SIZE; } #endif } +/// Returns a bitmask of threads in the current lane for which \p x is true. [[clang::convergent]] LIBC_INLINE uint64_t ballot(uint64_t lane_mask, bool x) { #if __CUDA_ARCH__ >= 600 return __nvvm_vote_ballot_sync(lane_mask, x); diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h index 8be4116..bc8c05b 100644 --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -423,14 +423,15 @@ template LIBC_INLINE void Port::send_n(const void *const *src, uint64_t *size) { // TODO: We could send the first bytes in this call and potentially save an // extra send operation. - // TODO: We may need a way for the CPU to send different strings per thread. uint64_t num_sends = 0; send([&](Buffer *buffer, uint32_t id) { reinterpret_cast(buffer->data)[0] = lane_value(size, id); num_sends = is_process_gpu() ? lane_value(size, id) : max(lane_value(size, id), num_sends); }); - for (uint64_t idx = 0; idx < num_sends; idx += sizeof(Buffer::data)) { + uint64_t idx = 0; + uint64_t mask = process.get_packet(index).header.mask; + while (gpu::ballot(mask, idx < num_sends)) { send([=](Buffer *buffer, uint32_t id) { const uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) @@ -440,8 +441,8 @@ LIBC_INLINE void Port::send_n(const void *const *src, uint64_t *size) { buffer->data, reinterpret_cast(lane_value(src, id)) + idx, len); }); + idx += sizeof(Buffer::data); } - gpu::sync_lane(process.get_packet(index).header.mask); } /// Helper routine to simplify the interface when sending from the GPU using @@ -468,7 +469,9 @@ LIBC_INLINE void Port::recv_n(void **dst, uint64_t *size, A &&alloc) { num_recvs = is_process_gpu() ? lane_value(size, id) : max(lane_value(size, id), num_recvs); }); - for (uint64_t idx = 0; idx < num_recvs; idx += sizeof(Buffer::data)) { + uint64_t idx = 0; + uint64_t mask = process.get_packet(index).header.mask; + while (gpu::ballot(mask, idx < num_recvs)) { recv([=](Buffer *buffer, uint32_t id) { uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) @@ -477,8 +480,8 @@ LIBC_INLINE void Port::recv_n(void **dst, uint64_t *size, A &&alloc) { inline_memcpy(reinterpret_cast(lane_value(dst, id)) + idx, buffer->data, len); }); + idx += sizeof(Buffer::data); } - return; } /// Attempts to open a port to use as the client. The client can only open a diff --git a/libc/test/integration/startup/gpu/CMakeLists.txt b/libc/test/integration/startup/gpu/CMakeLists.txt index 02018f9..7555986 100644 --- a/libc/test/integration/startup/gpu/CMakeLists.txt +++ b/libc/test/integration/startup/gpu/CMakeLists.txt @@ -50,5 +50,6 @@ add_integration_test( SRCS rpc_stream_test.cpp LOADER_ARGS - --threads-x 32 + --threads 32 + --blocks 8 ) diff --git a/libc/test/integration/startup/gpu/rpc_stream_test.cpp b/libc/test/integration/startup/gpu/rpc_stream_test.cpp index 3d495b4..442ff41 100644 --- a/libc/test/integration/startup/gpu/rpc_stream_test.cpp +++ b/libc/test/integration/startup/gpu/rpc_stream_test.cpp @@ -20,11 +20,12 @@ extern "C" void free(void *); using namespace __llvm_libc; static void test_stream() { - const char str[] = "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" - "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" - "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" - "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" - "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy"; + static const char str[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" + "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" + "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" + "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" + "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy"; uint64_t send_size = sizeof(str); void *send_ptr = malloc(send_size); void *recv_ptr; @@ -44,8 +45,51 @@ static void test_stream() { free(recv_ptr); } +static void test_divergent() { + static const uint8_t data[] = { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, + 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, + 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, + 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, + 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, + 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, + 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, + 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, + 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, + 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, + 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, + 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, + 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, + 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, + 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, + 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, + 255, + }; + + uint8_t buffer[128] = {0}; + uint64_t offset = + (gpu::get_thread_id() + gpu::get_num_threads() * gpu::get_block_id()) % + 128; + void *recv_ptr; + uint64_t recv_size; + inline_memcpy(buffer, &data[offset], offset); + ASSERT_TRUE(inline_memcmp(buffer, &data[offset], offset) == 0 && + "Data mismatch"); + rpc::Client::Port port = rpc::client.open(); + port.send_n(buffer, offset); + inline_memset(buffer, offset, 0); + port.recv_n(&recv_ptr, &recv_size, [&](uint64_t) { return buffer; }); + port.close(); + + ASSERT_TRUE(inline_memcmp(recv_ptr, &data[offset], recv_size) == 0 && + "Data mismatch"); + ASSERT_TRUE(recv_size == offset && "Data size mismatch"); +} + TEST_MAIN(int argc, char **argv, char **envp) { test_stream(); + test_divergent(); return 0; } -- 2.7.4