[Offload] Make the RPC thread sleep briefly when idle (#168596)

Summary:
We start this thread if the RPC client symbol is detected in the loaded
binary. We should make this sleep if there's no work to avoid the thread
running at high priority when the (scarecely used) RPC call is actually
required. So, right now after 25 microseconds we will assume the server
is inactive and begin sleeping. This resets once we do find work.

AMD supports a more intelligent way to do this. HSA signals can wake a
sleeping thread from the kernel, and signals can be sent from the GPU
side. This would be nice to have and I'm planning on working with it in
the future to make this infrastructure more usable with existing AMD
workloads.
This commit is contained in:
Joseph Huber
2025-11-19 15:56:25 -06:00
committed by GitHub
parent 253ed52436
commit eea62159e8

View File

@@ -83,7 +83,8 @@ static rpc::Status handleOffloadOpcodes(plugin::GenericDeviceTy &Device,
return rpc::RPC_ERROR;
}
static rpc::Status runServer(plugin::GenericDeviceTy &Device, void *Buffer) {
static rpc::Status runServer(plugin::GenericDeviceTy &Device, void *Buffer,
bool &ClientInUse) {
uint64_t NumPorts =
std::min(Device.requestedRPCPortCount(), rpc::MAX_PORT_COUNT);
rpc::Server Server(NumPorts, Buffer);
@@ -92,6 +93,7 @@ static rpc::Status runServer(plugin::GenericDeviceTy &Device, void *Buffer) {
if (!Port)
return rpc::RPC_SUCCESS;
ClientInUse = true;
rpc::Status Status =
handleOffloadOpcodes(Device, *Port, Device.getWarpSize());
@@ -99,7 +101,6 @@ static rpc::Status runServer(plugin::GenericDeviceTy &Device, void *Buffer) {
if (Status == rpc::RPC_UNHANDLED_OPCODE)
Status = LIBC_NAMESPACE::shared::handle_libc_opcodes(*Port,
Device.getWarpSize());
Port->close();
return Status;
@@ -122,7 +123,11 @@ void RPCServerTy::ServerThread::shutDown() {
}
void RPCServerTy::ServerThread::run() {
static constexpr auto IdleTime = std::chrono::microseconds(25);
static constexpr auto IdleSleep = std::chrono::microseconds(250);
std::unique_lock<decltype(Mutex)> Lock(Mutex);
auto LastUse = std::chrono::steady_clock::now();
for (;;) {
CV.wait(Lock, [&]() {
return NumUsers.load(std::memory_order_acquire) > 0 ||
@@ -133,15 +138,25 @@ void RPCServerTy::ServerThread::run() {
return;
Lock.unlock();
bool ClientInUse = false;
while (NumUsers.load(std::memory_order_relaxed) > 0 &&
Running.load(std::memory_order_relaxed)) {
// Suspend this thread briefly if there is no current work.
auto Now = std::chrono::steady_clock::now();
if (!ClientInUse && Now - LastUse >= IdleTime)
std::this_thread::sleep_for(IdleSleep);
else if (ClientInUse)
LastUse = Now;
ClientInUse = false;
std::lock_guard<decltype(Mutex)> Lock(BufferMutex);
for (const auto &[Buffer, Device] : llvm::zip_equal(Buffers, Devices)) {
if (!Buffer || !Device)
continue;
// If running the server failed, print a message but keep running.
if (runServer(*Device, Buffer) != rpc::RPC_SUCCESS)
if (runServer(*Device, Buffer, ClientInUse) != rpc::RPC_SUCCESS)
FAILURE_MESSAGE("Unhandled or invalid RPC opcode!");
}
}