[orc-rt] Replace wrapper fn void *CallCtx arg with uint64_t CallId. (#167452)

This argument serves as an opaque id (outside the ControllerAccess
object) for a call to a wrapper function. I expect that most
ControllerAccess implementations will want to use this argument as a
sequence number (plain integer), for which uint64_t will be a better fit
than void*. For ControllerAccess implementations that want to use a
pointer, uint64_t should be sufficiently large.
This commit is contained in:
Lang Hames
2025-11-11 18:11:41 +11:00
committed by GitHub
parent f2aad35caf
commit 4c4b1a905e
7 changed files with 60 additions and 54 deletions

View File

@@ -54,7 +54,7 @@ typedef struct {
* Asynchronous return function for an orc-rt wrapper function.
*/
typedef void (*orc_rt_WrapperFunctionReturn)(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionBuffer ResultBytes);
/**
@@ -62,10 +62,11 @@ typedef void (*orc_rt_WrapperFunctionReturn)(
*
* ArgBytes contains the serialized arguments for the wrapper function.
* Session holds a reference to the session object.
* CallCtx holds a pointer to the context object for this particular call.
* CallId holds a pointer to the context object for this particular call.
* Return holds a pointer to the return function.
*/
typedef void (*orc_rt_WrapperFunction)(orc_rt_SessionRef Session, void *CallCtx,
typedef void (*orc_rt_WrapperFunction)(orc_rt_SessionRef Session,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes);

View File

@@ -124,10 +124,10 @@ template <typename SPSSig> struct SPSWrapperFunction {
}
template <typename Handler>
static void handle(orc_rt_SessionRef Session, void *CallCtx,
static void handle(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
WrapperFunctionBuffer ArgBytes, Handler &&H) {
WrapperFunction::handle(Session, CallCtx, Return, std::move(ArgBytes),
WrapperFunction::handle(Session, CallId, Return, std::move(ArgBytes),
WrapperFunctionSPSSerializer<SPSSig>(),
std::forward<Handler>(H));
}

View File

@@ -114,21 +114,21 @@ private:
} // namespace orc_rt
ORC_RT_SPS_INTERFACE void orc_rt_SimpleNativeMemoryMap_reserve_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return, orc_rt_WrapperFunctionBuffer ArgBytes);
ORC_RT_SPS_INTERFACE void
orc_rt_SimpleNativeMemoryMap_releaseMultiple_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return, orc_rt_WrapperFunctionBuffer ArgBytes);
ORC_RT_SPS_INTERFACE void orc_rt_SimpleNativeMemoryMap_initialize_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return, orc_rt_WrapperFunctionBuffer ArgBytes);
ORC_RT_SPS_INTERFACE void
orc_rt_SimpleNativeMemoryMap_deinitializeMultiple_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return, orc_rt_WrapperFunctionBuffer ArgBytes);
#endif // ORC_RT_SIMPLENATIVEMEMORYMAP_H

View File

@@ -137,14 +137,14 @@ using WFHandlerTraits = CallableTraitsHelper<WFHandlerTraitsImpl, C>;
template <typename Serializer> class StructuredYieldBase {
public:
StructuredYieldBase(orc_rt_SessionRef Session, void *CallCtx,
StructuredYieldBase(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return, Serializer &&S)
: Session(Session), CallCtx(CallCtx), Return(Return),
: Session(Session), CallId(CallId), Return(Return),
S(std::forward<Serializer>(S)) {}
protected:
orc_rt_SessionRef Session;
void *CallCtx;
uint64_t CallId;
orc_rt_WrapperFunctionReturn Return;
std::decay_t<Serializer> S;
};
@@ -158,9 +158,9 @@ public:
using StructuredYieldBase<Serializer>::StructuredYieldBase;
void operator()(RetT &&R) {
if (auto ResultBytes = this->S.result().serialize(std::forward<RetT>(R)))
this->Return(this->Session, this->CallCtx, ResultBytes->release());
this->Return(this->Session, this->CallId, ResultBytes->release());
else
this->Return(this->Session, this->CallCtx,
this->Return(this->Session, this->CallId,
WrapperFunctionBuffer::createOutOfBandError(
"Could not serialize wrapper function result data")
.release());
@@ -173,7 +173,7 @@ class StructuredYield<std::tuple<>, Serializer>
public:
using StructuredYieldBase<Serializer>::StructuredYieldBase;
void operator()() {
this->Return(this->Session, this->CallCtx,
this->Return(this->Session, this->CallId,
WrapperFunctionBuffer().release());
}
};
@@ -251,12 +251,12 @@ struct WrapperFunction {
///
///
/// static void adder_add_async_sps_wrapper(
/// orc_rt_SessionRef Session, void *CallCtx,
/// orc_rt_SessionRef Session, uint64_t CallId,
/// orc_rt_WrapperFunctionReturn Return,
/// orc_rt_WrapperFunctionBuffer ArgBytes) {
/// using SPSSig = SPSString(SPSExecutorAddr, int32_t, bool);
/// SPSWrapperFunction<SPSSig>::handle(
/// Session, CallCtx, Return, ArgBytes,
/// Session, CallId, Return, ArgBytes,
/// WrapperFunction::handleWithAsyncMethod(&MyClass::myMethod));
/// }
/// @endcode
@@ -313,12 +313,12 @@ struct WrapperFunction {
///
///
/// static void adder_add_sync_sps_wrapper(
/// orc_rt_SessionRef Session, void *CallCtx,
/// orc_rt_SessionRef Session, uint64_t CallId,
/// orc_rt_WrapperFunctionReturn Return,
/// orc_rt_WrapperFunctionBuffer ArgBytes) {
/// using SPSSig = SPSString(SPSExecutorAddr, int32_t, bool);
/// SPSWrapperFunction<SPSSig>::handle(
/// Session, CallCtx, Return, ArgBytes,
/// Session, CallId, Return, ArgBytes,
/// WrapperFunction::handleWithSyncMethod(&Adder::addSync));
/// }
/// @endcode
@@ -368,7 +368,7 @@ struct WrapperFunction {
/// This utility deserializes and serializes arguments and return values
/// (using the given Serializer), and calls the given handler.
template <typename Serializer, typename Handler>
static void handle(orc_rt_SessionRef Session, void *CallCtx,
static void handle(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
WrapperFunctionBuffer ArgBytes, Serializer &&S,
Handler &&H) {
@@ -380,16 +380,16 @@ struct WrapperFunction {
typedef typename CallableArgInfo<Yield>::args_tuple_type RetTupleType;
if (ArgBytes.getOutOfBandError())
return Return(Session, CallCtx, ArgBytes.release());
return Return(Session, CallId, ArgBytes.release());
if (auto Args = S.arguments().template deserialize<ArgTuple>(ArgBytes))
std::apply(HandlerTraits::forwardArgsAsRequested(bind_front(
std::forward<Handler>(H),
detail::StructuredYield<RetTupleType, Serializer>(
Session, CallCtx, Return, std::move(S)))),
Session, CallId, Return, std::move(S)))),
*Args);
else
Return(Session, CallCtx,
Return(Session, CallId,
WrapperFunctionBuffer::createOutOfBandError(
"Could not deserialize wrapper function arg data")
.release());

View File

@@ -367,45 +367,45 @@ Error SimpleNativeMemoryMap::recordDeallocActions(
}
ORC_RT_SPS_INTERFACE void orc_rt_SimpleNativeMemoryMap_reserve_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
using Sig = SPSExpected<SPSExecutorAddr>(SPSExecutorAddr, SPSSize);
SPSWrapperFunction<Sig>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithAsyncMethod(&SimpleNativeMemoryMap::reserve));
}
ORC_RT_SPS_INTERFACE void
orc_rt_SimpleNativeMemoryMap_releaseMultiple_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
using Sig = SPSError(SPSExecutorAddr, SPSSequence<SPSExecutorAddr>);
SPSWrapperFunction<Sig>::handle(Session, CallCtx, Return, ArgBytes,
SPSWrapperFunction<Sig>::handle(Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithAsyncMethod(
&SimpleNativeMemoryMap::releaseMultiple));
}
ORC_RT_SPS_INTERFACE void orc_rt_SimpleNativeMemoryMap_initialize_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
using Sig = SPSExpected<SPSExecutorAddr>(
SPSExecutorAddr, SPSSimpleNativeMemoryMapInitializeRequest);
SPSWrapperFunction<Sig>::handle(Session, CallCtx, Return, ArgBytes,
SPSWrapperFunction<Sig>::handle(Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithAsyncMethod(
&SimpleNativeMemoryMap::initialize));
}
ORC_RT_SPS_INTERFACE void
orc_rt_SimpleNativeMemoryMap_deinitializeMultiple_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
using Sig = SPSError(SPSExecutorAddr, SPSSequence<SPSExecutorAddr>);
SPSWrapperFunction<Sig>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithAsyncMethod(
&SimpleNativeMemoryMap::deinitializeMultiple));
}

View File

@@ -22,10 +22,11 @@ private:
virtual ~DirectResultSender() {}
virtual void send(orc_rt_SessionRef Session,
orc_rt::WrapperFunctionBuffer ResultBytes) = 0;
static void send(orc_rt_SessionRef Session, void *CallCtx,
static void send(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionBuffer ResultBytes) {
std::unique_ptr<DirectResultSender>(
reinterpret_cast<DirectResultSender *>(CallCtx))
reinterpret_cast<DirectResultSender *>(
static_cast<uintptr_t>(CallId)))
->send(Session, ResultBytes);
}
};
@@ -59,7 +60,8 @@ public:
orc_rt::WrapperFunctionBuffer ArgBytes) {
auto DR =
makeDirectResultSender(std::forward<HandleResultFn>(HandleResult));
Fn(Session, reinterpret_cast<void *>(DR.release()),
Fn(Session,
static_cast<uint64_t>(reinterpret_cast<uintptr_t>(DR.release())),
DirectResultSender::send, ArgBytes.release());
}

View File

@@ -22,11 +22,11 @@
using namespace orc_rt;
static void void_noop_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
static void void_noop_sps_wrapper(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<void()>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
[](move_only_function<void()> Return) { Return(); });
}
@@ -40,11 +40,12 @@ TEST(SPSWrapperFunctionUtilsTest, VoidNoop) {
EXPECT_TRUE(Ran);
}
static void add_via_lambda_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
static void add_via_lambda_sps_wrapper(orc_rt_SessionRef Session,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<int32_t(int32_t, int32_t)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
[](move_only_function<void(int32_t)> Return, int32_t X, int32_t Y) {
Return(X + Y);
});
@@ -64,11 +65,11 @@ static void add_via_function(move_only_function<void(int32_t)> Return,
}
static void
add_via_function_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
add_via_function_sps_wrapper(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<int32_t(int32_t, int32_t)>::handle(
Session, CallCtx, Return, ArgBytes, add_via_function);
Session, CallId, Return, ArgBytes, add_via_function);
}
TEST(SPSWrapperFunctionUtilsTest, BinaryOpViaFunction) {
@@ -80,11 +81,11 @@ TEST(SPSWrapperFunctionUtilsTest, BinaryOpViaFunction) {
}
static void
add_via_function_pointer_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
add_via_function_pointer_sps_wrapper(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<int32_t(int32_t, int32_t)>::handle(
Session, CallCtx, Return, ArgBytes, &add_via_function);
Session, CallId, Return, ArgBytes, &add_via_function);
}
TEST(SPSWrapperFunctionUtilsTest, BinaryOpViaFunctionPointer) {
@@ -96,11 +97,12 @@ TEST(SPSWrapperFunctionUtilsTest, BinaryOpViaFunctionPointer) {
}
static void
round_trip_string_via_span_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
round_trip_string_via_span_sps_wrapper(orc_rt_SessionRef Session,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<SPSString(SPSString)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
[](move_only_function<void(std::string)> Return, span<const char> S) {
Return({S.data(), S.size()});
});
@@ -119,11 +121,11 @@ TEST(SPSWrapperFunctionUtilsTest, RoundTripStringViaSpan) {
}
static void improbable_feat_sps_wrapper(orc_rt_SessionRef Session,
void *CallCtx,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<SPSError(bool)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
[](move_only_function<void(Error)> Return, bool LuckyHat) {
if (LuckyHat)
Return(Error::success());
@@ -155,11 +157,11 @@ TEST(SPSWrapperFunctionUtilsTest, TransparentConversionErrorFailureCase) {
EXPECT_EQ(ErrMsg, "crushed by boulder");
}
static void halve_number_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
static void halve_number_sps_wrapper(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<SPSExpected<int32_t>(int32_t)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
[](move_only_function<void(Expected<int32_t>)> Return, int N) {
if (N % 2 == 0)
Return(N >> 1);
@@ -208,12 +210,12 @@ public:
static void
handle_with_reference_types_sps_wrapper(orc_rt_SessionRef Session,
void *CallCtx,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<void(
SPSOpCounter<0>, SPSOpCounter<1>, SPSOpCounter<2>,
SPSOpCounter<3>)>::handle(Session, CallCtx, Return, ArgBytes,
SPSOpCounter<3>)>::handle(Session, CallId, Return, ArgBytes,
[](move_only_function<void()> Return,
OpCounter<0>, OpCounter<1> &,
const OpCounter<2> &,
@@ -281,11 +283,11 @@ public:
} // anonymous namespace
static void adder_add_async_sps_wrapper(orc_rt_SessionRef Session,
void *CallCtx,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<int32_t(SPSExecutorAddr, int32_t, int32_t)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithAsyncMethod(&Adder::addAsync));
}
@@ -300,11 +302,12 @@ TEST(SPSWrapperFunctionUtilsTest, HandleWtihAsyncMethod) {
EXPECT_EQ(Result, 42);
}
static void adder_add_sync_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
static void adder_add_sync_sps_wrapper(orc_rt_SessionRef Session,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<int32_t(SPSExecutorAddr, int32_t, int32_t)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithSyncMethod(&Adder::addSync));
}