#include <grpcpp/impl/codegen/message_allocator.h>
#include <grpcpp/impl/codegen/rpc_service_method.h>
-#include <grpcpp/impl/codegen/server_callback_impl.h>
-#include <grpcpp/impl/codegen/server_context_impl.h>
+#include <grpcpp/impl/codegen/server_callback.h>
+#include <grpcpp/impl/codegen/server_context.h>
#include <grpcpp/impl/codegen/status.h>
-namespace grpc_impl {
+namespace grpc {
namespace internal {
template <class RequestType, class ResponseType>
class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
public:
explicit CallbackUnaryHandler(
- std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*,
+ std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
const RequestType*, ResponseType*)>
get_reactor)
: get_reactor_(std::move(get_reactor)) {}
auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
param.call->call(), sizeof(ServerCallbackUnaryImpl)))
ServerCallbackUnaryImpl(
- static_cast<::grpc_impl::CallbackServerContext*>(
- param.server_context),
- param.call, allocator_state, std::move(param.call_requester));
+ static_cast<::grpc::CallbackServerContext*>(param.server_context),
+ param.call, allocator_state, param.call_requester);
param.server_context->BeginCompletionOp(
param.call, [call](bool) { call->MaybeDone(); }, call);
if (param.status.ok()) {
reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
get_reactor_,
- static_cast<::grpc_impl::CallbackServerContext*>(
- param.server_context),
+ static_cast<::grpc::CallbackServerContext*>(param.server_context),
call->request(), call->response());
}
}
private:
- std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*,
+ std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
const RequestType*, ResponseType*)>
get_reactor_;
::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
class ServerCallbackUnaryImpl : public ServerCallbackUnary {
public:
void Finish(::grpc::Status s) override {
+ // A callback that only contains a call to MaybeDone can be run as an
+ // inline callback regardless of whether or not OnDone is inlineable
+ // because if the actual OnDone callback needs to be scheduled, MaybeDone
+ // is responsible for dispatching to an executor thread if needed. Thus,
+ // when setting up the finish_tag_, we can set its own callback to
+ // inlineable.
finish_tag_.Set(
- call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
- reactor_.load(std::memory_order_relaxed)->InternalInlineable());
+ call_.call(),
+ [this](bool) {
+ this->MaybeDone(
+ reactor_.load(std::memory_order_relaxed)->InternalInlineable());
+ },
+ &finish_ops_, /*can_inline=*/true);
finish_ops_.set_core_cq_tag(&finish_tag_);
if (!ctx_->sent_initial_metadata_) {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
this->Ref();
- meta_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_.load(std::memory_order_relaxed)
- ->OnSendInitialMetadataDone(ok);
- MaybeDone();
- },
- &meta_ops_, false);
+ // The callback for this function should not be marked inline because it
+ // is directly invoking a user-controlled reaction
+ // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
+ // thread. However, any OnDone needed after that can be inlined because it
+ // is already running on an executor thread.
+ meta_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ ServerUnaryReactor* reactor =
+ reactor_.load(std::memory_order_relaxed);
+ reactor->OnSendInitialMetadataDone(ok);
+ this->MaybeDone(/*inlineable_ondone=*/true);
+ },
+ &meta_ops_, /*can_inline=*/false);
meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
friend class CallbackUnaryHandler<RequestType, ResponseType>;
ServerCallbackUnaryImpl(
- ::grpc_impl::CallbackServerContext* ctx, ::grpc::internal::Call* call,
+ ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call,
::grpc::experimental::MessageHolder<RequestType, ResponseType>*
allocator_state,
std::function<void()> call_requester)
reactor_.store(reactor, std::memory_order_relaxed);
this->BindReactor(reactor);
this->MaybeCallOnCancel(reactor);
- this->MaybeDone();
+ this->MaybeDone(reactor->InternalInlineable());
}
const RequestType* request() { return allocator_state_->request(); }
ResponseType* response() { return allocator_state_->response(); }
- void MaybeDone() override {
- if (GPR_UNLIKELY(this->Unref() == 1)) {
- reactor_.load(std::memory_order_relaxed)->OnDone();
- grpc_call* call = call_.call();
- auto call_requester = std::move(call_requester_);
- allocator_state_->Release();
- this->~ServerCallbackUnaryImpl(); // explicitly call destructor
- ::grpc::g_core_codegen_interface->grpc_call_unref(call);
- call_requester();
+ void CallOnDone() override {
+ reactor_.load(std::memory_order_relaxed)->OnDone();
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ allocator_state_->Release();
+ if (ctx_->context_allocator() != nullptr) {
+ ctx_->context_allocator()->Release(ctx_);
}
+ this->~ServerCallbackUnaryImpl(); // explicitly call destructor
+ ::grpc::g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
}
ServerReactor* reactor() override {
finish_ops_;
::grpc::internal::CallbackWithSuccessTag finish_tag_;
- ::grpc_impl::CallbackServerContext* const ctx_;
+ ::grpc::CallbackServerContext* const ctx_;
::grpc::internal::Call call_;
::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
allocator_state_;
public:
explicit CallbackClientStreamingHandler(
std::function<ServerReadReactor<RequestType>*(
- ::grpc_impl::CallbackServerContext*, ResponseType*)>
+ ::grpc::CallbackServerContext*, ResponseType*)>
get_reactor)
: get_reactor_(std::move(get_reactor)) {}
void RunHandler(const HandlerParameter& param) final {
auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
param.call->call(), sizeof(ServerCallbackReaderImpl)))
ServerCallbackReaderImpl(
- static_cast<::grpc_impl::CallbackServerContext*>(
- param.server_context),
- param.call, std::move(param.call_requester));
+ static_cast<::grpc::CallbackServerContext*>(param.server_context),
+ param.call, param.call_requester);
+ // Inlineable OnDone can be false in the CompletionOp callback because there
+ // is no read reactor that has an inlineable OnDone; this only applies to
+ // the DefaultReactor (which is unary).
param.server_context->BeginCompletionOp(
- param.call, [reader](bool) { reader->MaybeDone(); }, reader);
+ param.call,
+ [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
+ reader);
ServerReadReactor<RequestType>* reactor = nullptr;
if (param.status.ok()) {
reactor = ::grpc::internal::CatchingReactorGetter<
ServerReadReactor<RequestType>>(
get_reactor_,
- static_cast<::grpc_impl::CallbackServerContext*>(
- param.server_context),
+ static_cast<::grpc::CallbackServerContext*>(param.server_context),
reader->response());
}
}
private:
- std::function<ServerReadReactor<RequestType>*(
- ::grpc_impl::CallbackServerContext*, ResponseType*)>
+ std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
+ ResponseType*)>
get_reactor_;
class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
public:
void Finish(::grpc::Status s) override {
- finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
- false);
+ // A finish tag with only MaybeDone can have its callback inlined
+ // regardless even if OnDone is not inlineable because this callback just
+ // checks a ref and then decides whether or not to dispatch OnDone.
+ finish_tag_.Set(
+ call_.call(),
+ [this](bool) {
+ // Inlineable OnDone can be false here because there is
+ // no read reactor that has an inlineable OnDone; this
+ // only applies to the DefaultReactor (which is unary).
+ this->MaybeDone(/*inlineable_ondone=*/false);
+ },
+ &finish_ops_, /*can_inline=*/true);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
this->Ref();
- meta_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_.load(std::memory_order_relaxed)
- ->OnSendInitialMetadataDone(ok);
- MaybeDone();
- },
- &meta_ops_, false);
+ // The callback for this function should not be inlined because it invokes
+ // a user-controlled reaction, but any resulting OnDone can be inlined in
+ // the executor to which this callback is dispatched.
+ meta_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ ServerReadReactor<RequestType>* reactor =
+ reactor_.load(std::memory_order_relaxed);
+ reactor->OnSendInitialMetadataDone(ok);
+ this->MaybeDone(/*inlineable_ondone=*/true);
+ },
+ &meta_ops_, /*can_inline=*/false);
meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
private:
friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
- ServerCallbackReaderImpl(::grpc_impl::CallbackServerContext* ctx,
+ ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
::grpc::internal::Call* call,
std::function<void()> call_requester)
: ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
void SetupReactor(ServerReadReactor<RequestType>* reactor) {
reactor_.store(reactor, std::memory_order_relaxed);
- read_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
- MaybeDone();
- },
- &read_ops_, false);
+ // The callback for this function should not be inlined because it invokes
+ // a user-controlled reaction, but any resulting OnDone can be inlined in
+ // the executor to which this callback is dispatched.
+ read_tag_.Set(
+ call_.call(),
+ [this, reactor](bool ok) {
+ reactor->OnReadDone(ok);
+ this->MaybeDone(/*inlineable_ondone=*/true);
+ },
+ &read_ops_, /*can_inline=*/false);
read_ops_.set_core_cq_tag(&read_tag_);
this->BindReactor(reactor);
this->MaybeCallOnCancel(reactor);
- this->MaybeDone();
+ // Inlineable OnDone can be false here because there is no read
+ // reactor that has an inlineable OnDone; this only applies to the
+ // DefaultReactor (which is unary).
+ this->MaybeDone(/*inlineable_ondone=*/false);
}
~ServerCallbackReaderImpl() {}
ResponseType* response() { return &resp_; }
- void MaybeDone() override {
- if (GPR_UNLIKELY(this->Unref() == 1)) {
- reactor_.load(std::memory_order_relaxed)->OnDone();
- grpc_call* call = call_.call();
- auto call_requester = std::move(call_requester_);
- this->~ServerCallbackReaderImpl(); // explicitly call destructor
- ::grpc::g_core_codegen_interface->grpc_call_unref(call);
- call_requester();
+ void CallOnDone() override {
+ reactor_.load(std::memory_order_relaxed)->OnDone();
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ if (ctx_->context_allocator() != nullptr) {
+ ctx_->context_allocator()->Release(ctx_);
}
+ this->~ServerCallbackReaderImpl(); // explicitly call destructor
+ ::grpc::g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
}
ServerReactor* reactor() override {
read_ops_;
::grpc::internal::CallbackWithSuccessTag read_tag_;
- ::grpc_impl::CallbackServerContext* const ctx_;
+ ::grpc::CallbackServerContext* const ctx_;
::grpc::internal::Call call_;
ResponseType resp_;
std::function<void()> call_requester_;
public:
explicit CallbackServerStreamingHandler(
std::function<ServerWriteReactor<ResponseType>*(
- ::grpc_impl::CallbackServerContext*, const RequestType*)>
+ ::grpc::CallbackServerContext*, const RequestType*)>
get_reactor)
: get_reactor_(std::move(get_reactor)) {}
void RunHandler(const HandlerParameter& param) final {
auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
param.call->call(), sizeof(ServerCallbackWriterImpl)))
ServerCallbackWriterImpl(
- static_cast<::grpc_impl::CallbackServerContext*>(
- param.server_context),
+ static_cast<::grpc::CallbackServerContext*>(param.server_context),
param.call, static_cast<RequestType*>(param.request),
- std::move(param.call_requester));
+ param.call_requester);
+ // Inlineable OnDone can be false in the CompletionOp callback because there
+ // is no write reactor that has an inlineable OnDone; this only applies to
+ // the DefaultReactor (which is unary).
param.server_context->BeginCompletionOp(
- param.call, [writer](bool) { writer->MaybeDone(); }, writer);
+ param.call,
+ [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
+ writer);
ServerWriteReactor<ResponseType>* reactor = nullptr;
if (param.status.ok()) {
reactor = ::grpc::internal::CatchingReactorGetter<
ServerWriteReactor<ResponseType>>(
get_reactor_,
- static_cast<::grpc_impl::CallbackServerContext*>(
- param.server_context),
+ static_cast<::grpc::CallbackServerContext*>(param.server_context),
writer->request());
}
if (reactor == nullptr) {
private:
std::function<ServerWriteReactor<ResponseType>*(
- ::grpc_impl::CallbackServerContext*, const RequestType*)>
+ ::grpc::CallbackServerContext*, const RequestType*)>
get_reactor_;
class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
public:
void Finish(::grpc::Status s) override {
- finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
- false);
+ // A finish tag with only MaybeDone can have its callback inlined
+ // regardless even if OnDone is not inlineable because this callback just
+ // checks a ref and then decides whether or not to dispatch OnDone.
+ finish_tag_.Set(
+ call_.call(),
+ [this](bool) {
+ // Inlineable OnDone can be false here because there is
+ // no write reactor that has an inlineable OnDone; this
+ // only applies to the DefaultReactor (which is unary).
+ this->MaybeDone(/*inlineable_ondone=*/false);
+ },
+ &finish_ops_, /*can_inline=*/true);
finish_ops_.set_core_cq_tag(&finish_tag_);
if (!ctx_->sent_initial_metadata_) {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
this->Ref();
- meta_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_.load(std::memory_order_relaxed)
- ->OnSendInitialMetadataDone(ok);
- MaybeDone();
- },
- &meta_ops_, false);
+ // The callback for this function should not be inlined because it invokes
+ // a user-controlled reaction, but any resulting OnDone can be inlined in
+ // the executor to which this callback is dispatched.
+ meta_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ ServerWriteReactor<ResponseType>* reactor =
+ reactor_.load(std::memory_order_relaxed);
+ reactor->OnSendInitialMetadataDone(ok);
+ this->MaybeDone(/*inlineable_ondone=*/true);
+ },
+ &meta_ops_, /*can_inline=*/false);
meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
::grpc::Status s) override {
// This combines the write into the finish callback
- // Don't send any message if the status is bad
- if (s.ok()) {
- // TODO(vjpai): don't assert
- GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
- }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
Finish(std::move(s));
}
private:
friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
- ServerCallbackWriterImpl(::grpc_impl::CallbackServerContext* ctx,
+ ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
::grpc::internal::Call* call,
const RequestType* req,
std::function<void()> call_requester)
void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
reactor_.store(reactor, std::memory_order_relaxed);
+ // The callback for this function should not be inlined because it invokes
+ // a user-controlled reaction, but any resulting OnDone can be inlined in
+ // the executor to which this callback is dispatched.
write_tag_.Set(
call_.call(),
- [this](bool ok) {
- reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
- MaybeDone();
+ [this, reactor](bool ok) {
+ reactor->OnWriteDone(ok);
+ this->MaybeDone(/*inlineable_ondone=*/true);
},
- &write_ops_, false);
+ &write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
this->BindReactor(reactor);
this->MaybeCallOnCancel(reactor);
- this->MaybeDone();
+ // Inlineable OnDone can be false here because there is no write
+ // reactor that has an inlineable OnDone; this only applies to the
+ // DefaultReactor (which is unary).
+ this->MaybeDone(/*inlineable_ondone=*/false);
+ }
+ ~ServerCallbackWriterImpl() {
+ if (req_ != nullptr) {
+ req_->~RequestType();
+ }
}
- ~ServerCallbackWriterImpl() { req_->~RequestType(); }
const RequestType* request() { return req_; }
- void MaybeDone() override {
- if (GPR_UNLIKELY(this->Unref() == 1)) {
- reactor_.load(std::memory_order_relaxed)->OnDone();
- grpc_call* call = call_.call();
- auto call_requester = std::move(call_requester_);
- this->~ServerCallbackWriterImpl(); // explicitly call destructor
- ::grpc::g_core_codegen_interface->grpc_call_unref(call);
- call_requester();
+ void CallOnDone() override {
+ reactor_.load(std::memory_order_relaxed)->OnDone();
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ if (ctx_->context_allocator() != nullptr) {
+ ctx_->context_allocator()->Release(ctx_);
}
+ this->~ServerCallbackWriterImpl(); // explicitly call destructor
+ ::grpc::g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
}
ServerReactor* reactor() override {
write_ops_;
::grpc::internal::CallbackWithSuccessTag write_tag_;
- ::grpc_impl::CallbackServerContext* const ctx_;
+ ::grpc::CallbackServerContext* const ctx_;
::grpc::internal::Call call_;
const RequestType* req_;
std::function<void()> call_requester_;
public:
explicit CallbackBidiHandler(
std::function<ServerBidiReactor<RequestType, ResponseType>*(
- ::grpc_impl::CallbackServerContext*)>
+ ::grpc::CallbackServerContext*)>
get_reactor)
: get_reactor_(std::move(get_reactor)) {}
void RunHandler(const HandlerParameter& param) final {
auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
ServerCallbackReaderWriterImpl(
- static_cast<::grpc_impl::CallbackServerContext*>(
- param.server_context),
- param.call, std::move(param.call_requester));
+ static_cast<::grpc::CallbackServerContext*>(param.server_context),
+ param.call, param.call_requester);
+ // Inlineable OnDone can be false in the CompletionOp callback because there
+ // is no bidi reactor that has an inlineable OnDone; this only applies to
+ // the DefaultReactor (which is unary).
param.server_context->BeginCompletionOp(
- param.call, [stream](bool) { stream->MaybeDone(); }, stream);
+ param.call,
+ [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
+ stream);
ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
if (param.status.ok()) {
reactor = ::grpc::internal::CatchingReactorGetter<
ServerBidiReactor<RequestType, ResponseType>>(
- get_reactor_, static_cast<::grpc_impl::CallbackServerContext*>(
- param.server_context));
+ get_reactor_,
+ static_cast<::grpc::CallbackServerContext*>(param.server_context));
}
if (reactor == nullptr) {
private:
std::function<ServerBidiReactor<RequestType, ResponseType>*(
- ::grpc_impl::CallbackServerContext*)>
+ ::grpc::CallbackServerContext*)>
get_reactor_;
class ServerCallbackReaderWriterImpl
: public ServerCallbackReaderWriter<RequestType, ResponseType> {
public:
void Finish(::grpc::Status s) override {
- finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
- false);
+ // A finish tag with only MaybeDone can have its callback inlined
+ // regardless even if OnDone is not inlineable because this callback just
+ // checks a ref and then decides whether or not to dispatch OnDone.
+ finish_tag_.Set(
+ call_.call(),
+ [this](bool) {
+ // Inlineable OnDone can be false here because there is
+ // no bidi reactor that has an inlineable OnDone; this
+ // only applies to the DefaultReactor (which is unary).
+ this->MaybeDone(/*inlineable_ondone=*/false);
+ },
+ &finish_ops_, /*can_inline=*/true);
finish_ops_.set_core_cq_tag(&finish_tag_);
if (!ctx_->sent_initial_metadata_) {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
this->Ref();
- meta_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_.load(std::memory_order_relaxed)
- ->OnSendInitialMetadataDone(ok);
- MaybeDone();
- },
- &meta_ops_, false);
+ // The callback for this function should not be inlined because it invokes
+ // a user-controlled reaction, but any resulting OnDone can be inlined in
+ // the executor to which this callback is dispatched.
+ meta_tag_.Set(
+ call_.call(),
+ [this](bool ok) {
+ ServerBidiReactor<RequestType, ResponseType>* reactor =
+ reactor_.load(std::memory_order_relaxed);
+ reactor->OnSendInitialMetadataDone(ok);
+ this->MaybeDone(/*inlineable_ondone=*/true);
+ },
+ &meta_ops_, /*can_inline=*/false);
meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
::grpc::Status s) override {
- // Don't send any message if the status is bad
- if (s.ok()) {
- // TODO(vjpai): don't assert
- GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
- }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
Finish(std::move(s));
}
private:
friend class CallbackBidiHandler<RequestType, ResponseType>;
- ServerCallbackReaderWriterImpl(::grpc_impl::CallbackServerContext* ctx,
+ ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
::grpc::internal::Call* call,
std::function<void()> call_requester)
: ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
reactor_.store(reactor, std::memory_order_relaxed);
+ // The callbacks for these functions should not be inlined because they
+ // invoke user-controlled reactions, but any resulting OnDones can be
+ // inlined in the executor to which a callback is dispatched.
write_tag_.Set(
call_.call(),
- [this](bool ok) {
- reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
- MaybeDone();
+ [this, reactor](bool ok) {
+ reactor->OnWriteDone(ok);
+ this->MaybeDone(/*inlineable_ondone=*/true);
},
- &write_ops_, false);
+ &write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
- read_tag_.Set(call_.call(),
- [this](bool ok) {
- reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
- MaybeDone();
- },
- &read_ops_, false);
+ read_tag_.Set(
+ call_.call(),
+ [this, reactor](bool ok) {
+ reactor->OnReadDone(ok);
+ this->MaybeDone(/*inlineable_ondone=*/true);
+ },
+ &read_ops_, /*can_inline=*/false);
read_ops_.set_core_cq_tag(&read_tag_);
this->BindReactor(reactor);
this->MaybeCallOnCancel(reactor);
- this->MaybeDone();
- }
-
- void MaybeDone() override {
- if (GPR_UNLIKELY(this->Unref() == 1)) {
- reactor_.load(std::memory_order_relaxed)->OnDone();
- grpc_call* call = call_.call();
- auto call_requester = std::move(call_requester_);
- this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
- ::grpc::g_core_codegen_interface->grpc_call_unref(call);
- call_requester();
+ // Inlineable OnDone can be false here because there is no bidi
+ // reactor that has an inlineable OnDone; this only applies to the
+ // DefaultReactor (which is unary).
+ this->MaybeDone(/*inlineable_ondone=*/false);
+ }
+
+ void CallOnDone() override {
+ reactor_.load(std::memory_order_relaxed)->OnDone();
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ if (ctx_->context_allocator() != nullptr) {
+ ctx_->context_allocator()->Release(ctx_);
}
+ this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
+ ::grpc::g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
}
ServerReactor* reactor() override {
read_ops_;
::grpc::internal::CallbackWithSuccessTag read_tag_;
- ::grpc_impl::CallbackServerContext* const ctx_;
+ ::grpc::CallbackServerContext* const ctx_;
::grpc::internal::Call call_;
std::function<void()> call_requester_;
// The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
};
} // namespace internal
-} // namespace grpc_impl
+} // namespace grpc
#endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H