Imported Upstream version 1.36.0
[platform/upstream/grpc.git] / include / grpcpp / impl / codegen / server_callback_handlers.h
index 7752864..76e655a 100644 (file)
 
 #include <grpcpp/impl/codegen/message_allocator.h>
 #include <grpcpp/impl/codegen/rpc_service_method.h>
-#include <grpcpp/impl/codegen/server_callback_impl.h>
-#include <grpcpp/impl/codegen/server_context_impl.h>
+#include <grpcpp/impl/codegen/server_callback.h>
+#include <grpcpp/impl/codegen/server_context.h>
 #include <grpcpp/impl/codegen/status.h>
 
-namespace grpc_impl {
+namespace grpc {
 namespace internal {
 
 template <class RequestType, class ResponseType>
 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
  public:
   explicit CallbackUnaryHandler(
-      std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*,
+      std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
                                         const RequestType*, ResponseType*)>
           get_reactor)
       : get_reactor_(std::move(get_reactor)) {}
@@ -52,9 +52,8 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
     auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
         param.call->call(), sizeof(ServerCallbackUnaryImpl)))
         ServerCallbackUnaryImpl(
-            static_cast<::grpc_impl::CallbackServerContext*>(
-                param.server_context),
-            param.call, allocator_state, std::move(param.call_requester));
+            static_cast<::grpc::CallbackServerContext*>(param.server_context),
+            param.call, allocator_state, param.call_requester);
     param.server_context->BeginCompletionOp(
         param.call, [call](bool) { call->MaybeDone(); }, call);
 
@@ -62,8 +61,7 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
     if (param.status.ok()) {
       reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
           get_reactor_,
-          static_cast<::grpc_impl::CallbackServerContext*>(
-              param.server_context),
+          static_cast<::grpc::CallbackServerContext*>(param.server_context),
           call->request(), call->response());
     }
 
@@ -108,7 +106,7 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
   }
 
  private:
-  std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*,
+  std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
                                     const RequestType*, ResponseType*)>
       get_reactor_;
   ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
@@ -117,9 +115,19 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
   class ServerCallbackUnaryImpl : public ServerCallbackUnary {
    public:
     void Finish(::grpc::Status s) override {
+      // A callback that only contains a call to MaybeDone can be run as an
+      // inline callback regardless of whether or not OnDone is inlineable
+      // because if the actual OnDone callback needs to be scheduled, MaybeDone
+      // is responsible for dispatching to an executor thread if needed. Thus,
+      // when setting up the finish_tag_, we can set its own callback to
+      // inlineable.
       finish_tag_.Set(
-          call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
-          reactor_.load(std::memory_order_relaxed)->InternalInlineable());
+          call_.call(),
+          [this](bool) {
+            this->MaybeDone(
+                reactor_.load(std::memory_order_relaxed)->InternalInlineable());
+          },
+          &finish_ops_, /*can_inline=*/true);
       finish_ops_.set_core_cq_tag(&finish_tag_);
 
       if (!ctx_->sent_initial_metadata_) {
@@ -144,13 +152,20 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
     void SendInitialMetadata() override {
       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
       this->Ref();
-      meta_tag_.Set(call_.call(),
-                    [this](bool ok) {
-                      reactor_.load(std::memory_order_relaxed)
-                          ->OnSendInitialMetadataDone(ok);
-                      MaybeDone();
-                    },
-                    &meta_ops_, false);
+      // The callback for this function should not be marked inline because it
+      // is directly invoking a user-controlled reaction
+      // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
+      // thread. However, any OnDone needed after that can be inlined because it
+      // is already running on an executor thread.
+      meta_tag_.Set(
+          call_.call(),
+          [this](bool ok) {
+            ServerUnaryReactor* reactor =
+                reactor_.load(std::memory_order_relaxed);
+            reactor->OnSendInitialMetadataDone(ok);
+            this->MaybeDone(/*inlineable_ondone=*/true);
+          },
+          &meta_ops_, /*can_inline=*/false);
       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
                                     ctx_->initial_metadata_flags());
       if (ctx_->compression_level_set()) {
@@ -165,7 +180,7 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
     friend class CallbackUnaryHandler<RequestType, ResponseType>;
 
     ServerCallbackUnaryImpl(
-        ::grpc_impl::CallbackServerContext* ctx, ::grpc::internal::Call* call,
+        ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call,
         ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
             allocator_state,
         std::function<void()> call_requester)
@@ -184,22 +199,23 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
       reactor_.store(reactor, std::memory_order_relaxed);
       this->BindReactor(reactor);
       this->MaybeCallOnCancel(reactor);
-      this->MaybeDone();
+      this->MaybeDone(reactor->InternalInlineable());
     }
 
     const RequestType* request() { return allocator_state_->request(); }
     ResponseType* response() { return allocator_state_->response(); }
 
-    void MaybeDone() override {
-      if (GPR_UNLIKELY(this->Unref() == 1)) {
-        reactor_.load(std::memory_order_relaxed)->OnDone();
-        grpc_call* call = call_.call();
-        auto call_requester = std::move(call_requester_);
-        allocator_state_->Release();
-        this->~ServerCallbackUnaryImpl();  // explicitly call destructor
-        ::grpc::g_core_codegen_interface->grpc_call_unref(call);
-        call_requester();
+    void CallOnDone() override {
+      reactor_.load(std::memory_order_relaxed)->OnDone();
+      grpc_call* call = call_.call();
+      auto call_requester = std::move(call_requester_);
+      allocator_state_->Release();
+      if (ctx_->context_allocator() != nullptr) {
+        ctx_->context_allocator()->Release(ctx_);
       }
+      this->~ServerCallbackUnaryImpl();  // explicitly call destructor
+      ::grpc::g_core_codegen_interface->grpc_call_unref(call);
+      call_requester();
     }
 
     ServerReactor* reactor() override {
@@ -215,7 +231,7 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
         finish_ops_;
     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
 
-    ::grpc_impl::CallbackServerContext* const ctx_;
+    ::grpc::CallbackServerContext* const ctx_;
     ::grpc::internal::Call call_;
     ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
         allocator_state_;
@@ -242,7 +258,7 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
  public:
   explicit CallbackClientStreamingHandler(
       std::function<ServerReadReactor<RequestType>*(
-          ::grpc_impl::CallbackServerContext*, ResponseType*)>
+          ::grpc::CallbackServerContext*, ResponseType*)>
           get_reactor)
       : get_reactor_(std::move(get_reactor)) {}
   void RunHandler(const HandlerParameter& param) final {
@@ -252,19 +268,22 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
     auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
         param.call->call(), sizeof(ServerCallbackReaderImpl)))
         ServerCallbackReaderImpl(
-            static_cast<::grpc_impl::CallbackServerContext*>(
-                param.server_context),
-            param.call, std::move(param.call_requester));
+            static_cast<::grpc::CallbackServerContext*>(param.server_context),
+            param.call, param.call_requester);
+    // Inlineable OnDone can be false in the CompletionOp callback because there
+    // is no read reactor that has an inlineable OnDone; this only applies to
+    // the DefaultReactor (which is unary).
     param.server_context->BeginCompletionOp(
-        param.call, [reader](bool) { reader->MaybeDone(); }, reader);
+        param.call,
+        [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
+        reader);
 
     ServerReadReactor<RequestType>* reactor = nullptr;
     if (param.status.ok()) {
       reactor = ::grpc::internal::CatchingReactorGetter<
           ServerReadReactor<RequestType>>(
           get_reactor_,
-          static_cast<::grpc_impl::CallbackServerContext*>(
-              param.server_context),
+          static_cast<::grpc::CallbackServerContext*>(param.server_context),
           reader->response());
     }
 
@@ -280,15 +299,25 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
   }
 
  private:
-  std::function<ServerReadReactor<RequestType>*(
-      ::grpc_impl::CallbackServerContext*, ResponseType*)>
+  std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
+                                                ResponseType*)>
       get_reactor_;
 
   class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
    public:
     void Finish(::grpc::Status s) override {
-      finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
-                      false);
+      // A finish tag with only MaybeDone can have its callback inlined
+      // regardless even if OnDone is not inlineable because this callback just
+      // checks a ref and then decides whether or not to dispatch OnDone.
+      finish_tag_.Set(
+          call_.call(),
+          [this](bool) {
+            // Inlineable OnDone can be false here because there is
+            // no read reactor that has an inlineable OnDone; this
+            // only applies to the DefaultReactor (which is unary).
+            this->MaybeDone(/*inlineable_ondone=*/false);
+          },
+          &finish_ops_, /*can_inline=*/true);
       if (!ctx_->sent_initial_metadata_) {
         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
                                         ctx_->initial_metadata_flags());
@@ -311,13 +340,18 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
     void SendInitialMetadata() override {
       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
       this->Ref();
-      meta_tag_.Set(call_.call(),
-                    [this](bool ok) {
-                      reactor_.load(std::memory_order_relaxed)
-                          ->OnSendInitialMetadataDone(ok);
-                      MaybeDone();
-                    },
-                    &meta_ops_, false);
+      // The callback for this function should not be inlined because it invokes
+      // a user-controlled reaction, but any resulting OnDone can be inlined in
+      // the executor to which this callback is dispatched.
+      meta_tag_.Set(
+          call_.call(),
+          [this](bool ok) {
+            ServerReadReactor<RequestType>* reactor =
+                reactor_.load(std::memory_order_relaxed);
+            reactor->OnSendInitialMetadataDone(ok);
+            this->MaybeDone(/*inlineable_ondone=*/true);
+          },
+          &meta_ops_, /*can_inline=*/false);
       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
                                     ctx_->initial_metadata_flags());
       if (ctx_->compression_level_set()) {
@@ -337,38 +371,46 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
    private:
     friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
 
-    ServerCallbackReaderImpl(::grpc_impl::CallbackServerContext* ctx,
+    ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
                              ::grpc::internal::Call* call,
                              std::function<void()> call_requester)
         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
 
     void SetupReactor(ServerReadReactor<RequestType>* reactor) {
       reactor_.store(reactor, std::memory_order_relaxed);
-      read_tag_.Set(call_.call(),
-                    [this](bool ok) {
-                      reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
-                      MaybeDone();
-                    },
-                    &read_ops_, false);
+      // The callback for this function should not be inlined because it invokes
+      // a user-controlled reaction, but any resulting OnDone can be inlined in
+      // the executor to which this callback is dispatched.
+      read_tag_.Set(
+          call_.call(),
+          [this, reactor](bool ok) {
+            reactor->OnReadDone(ok);
+            this->MaybeDone(/*inlineable_ondone=*/true);
+          },
+          &read_ops_, /*can_inline=*/false);
       read_ops_.set_core_cq_tag(&read_tag_);
       this->BindReactor(reactor);
       this->MaybeCallOnCancel(reactor);
-      this->MaybeDone();
+      // Inlineable OnDone can be false here because there is no read
+      // reactor that has an inlineable OnDone; this only applies to the
+      // DefaultReactor (which is unary).
+      this->MaybeDone(/*inlineable_ondone=*/false);
     }
 
     ~ServerCallbackReaderImpl() {}
 
     ResponseType* response() { return &resp_; }
 
-    void MaybeDone() override {
-      if (GPR_UNLIKELY(this->Unref() == 1)) {
-        reactor_.load(std::memory_order_relaxed)->OnDone();
-        grpc_call* call = call_.call();
-        auto call_requester = std::move(call_requester_);
-        this->~ServerCallbackReaderImpl();  // explicitly call destructor
-        ::grpc::g_core_codegen_interface->grpc_call_unref(call);
-        call_requester();
+    void CallOnDone() override {
+      reactor_.load(std::memory_order_relaxed)->OnDone();
+      grpc_call* call = call_.call();
+      auto call_requester = std::move(call_requester_);
+      if (ctx_->context_allocator() != nullptr) {
+        ctx_->context_allocator()->Release(ctx_);
       }
+      this->~ServerCallbackReaderImpl();  // explicitly call destructor
+      ::grpc::g_core_codegen_interface->grpc_call_unref(call);
+      call_requester();
     }
 
     ServerReactor* reactor() override {
@@ -388,7 +430,7 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
         read_ops_;
     ::grpc::internal::CallbackWithSuccessTag read_tag_;
 
-    ::grpc_impl::CallbackServerContext* const ctx_;
+    ::grpc::CallbackServerContext* const ctx_;
     ::grpc::internal::Call call_;
     ResponseType resp_;
     std::function<void()> call_requester_;
@@ -405,7 +447,7 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
  public:
   explicit CallbackServerStreamingHandler(
       std::function<ServerWriteReactor<ResponseType>*(
-          ::grpc_impl::CallbackServerContext*, const RequestType*)>
+          ::grpc::CallbackServerContext*, const RequestType*)>
           get_reactor)
       : get_reactor_(std::move(get_reactor)) {}
   void RunHandler(const HandlerParameter& param) final {
@@ -415,20 +457,23 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
     auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
         param.call->call(), sizeof(ServerCallbackWriterImpl)))
         ServerCallbackWriterImpl(
-            static_cast<::grpc_impl::CallbackServerContext*>(
-                param.server_context),
+            static_cast<::grpc::CallbackServerContext*>(param.server_context),
             param.call, static_cast<RequestType*>(param.request),
-            std::move(param.call_requester));
+            param.call_requester);
+    // Inlineable OnDone can be false in the CompletionOp callback because there
+    // is no write reactor that has an inlineable OnDone; this only applies to
+    // the DefaultReactor (which is unary).
     param.server_context->BeginCompletionOp(
-        param.call, [writer](bool) { writer->MaybeDone(); }, writer);
+        param.call,
+        [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
+        writer);
 
     ServerWriteReactor<ResponseType>* reactor = nullptr;
     if (param.status.ok()) {
       reactor = ::grpc::internal::CatchingReactorGetter<
           ServerWriteReactor<ResponseType>>(
           get_reactor_,
-          static_cast<::grpc_impl::CallbackServerContext*>(
-              param.server_context),
+          static_cast<::grpc::CallbackServerContext*>(param.server_context),
           writer->request());
     }
     if (reactor == nullptr) {
@@ -461,14 +506,24 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
 
  private:
   std::function<ServerWriteReactor<ResponseType>*(
-      ::grpc_impl::CallbackServerContext*, const RequestType*)>
+      ::grpc::CallbackServerContext*, const RequestType*)>
       get_reactor_;
 
   class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
    public:
     void Finish(::grpc::Status s) override {
-      finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
-                      false);
+      // A finish tag with only MaybeDone can have its callback inlined
+      // regardless even if OnDone is not inlineable because this callback just
+      // checks a ref and then decides whether or not to dispatch OnDone.
+      finish_tag_.Set(
+          call_.call(),
+          [this](bool) {
+            // Inlineable OnDone can be false here because there is
+            // no write reactor that has an inlineable OnDone; this
+            // only applies to the DefaultReactor (which is unary).
+            this->MaybeDone(/*inlineable_ondone=*/false);
+          },
+          &finish_ops_, /*can_inline=*/true);
       finish_ops_.set_core_cq_tag(&finish_tag_);
 
       if (!ctx_->sent_initial_metadata_) {
@@ -486,13 +541,18 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
     void SendInitialMetadata() override {
       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
       this->Ref();
-      meta_tag_.Set(call_.call(),
-                    [this](bool ok) {
-                      reactor_.load(std::memory_order_relaxed)
-                          ->OnSendInitialMetadataDone(ok);
-                      MaybeDone();
-                    },
-                    &meta_ops_, false);
+      // The callback for this function should not be inlined because it invokes
+      // a user-controlled reaction, but any resulting OnDone can be inlined in
+      // the executor to which this callback is dispatched.
+      meta_tag_.Set(
+          call_.call(),
+          [this](bool ok) {
+            ServerWriteReactor<ResponseType>* reactor =
+                reactor_.load(std::memory_order_relaxed);
+            reactor->OnSendInitialMetadataDone(ok);
+            this->MaybeDone(/*inlineable_ondone=*/true);
+          },
+          &meta_ops_, /*can_inline=*/false);
       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
                                     ctx_->initial_metadata_flags());
       if (ctx_->compression_level_set()) {
@@ -525,18 +585,15 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
                         ::grpc::Status s) override {
       // This combines the write into the finish callback
-      // Don't send any message if the status is bad
-      if (s.ok()) {
-        // TODO(vjpai): don't assert
-        GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
-      }
+      // TODO(vjpai): don't assert
+      GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
       Finish(std::move(s));
     }
 
    private:
     friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
 
-    ServerCallbackWriterImpl(::grpc_impl::CallbackServerContext* ctx,
+    ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
                              ::grpc::internal::Call* call,
                              const RequestType* req,
                              std::function<void()> call_requester)
@@ -547,31 +604,42 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
 
     void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
       reactor_.store(reactor, std::memory_order_relaxed);
+      // The callback for this function should not be inlined because it invokes
+      // a user-controlled reaction, but any resulting OnDone can be inlined in
+      // the executor to which this callback is dispatched.
       write_tag_.Set(
           call_.call(),
-          [this](bool ok) {
-            reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
-            MaybeDone();
+          [this, reactor](bool ok) {
+            reactor->OnWriteDone(ok);
+            this->MaybeDone(/*inlineable_ondone=*/true);
           },
-          &write_ops_, false);
+          &write_ops_, /*can_inline=*/false);
       write_ops_.set_core_cq_tag(&write_tag_);
       this->BindReactor(reactor);
       this->MaybeCallOnCancel(reactor);
-      this->MaybeDone();
+      // Inlineable OnDone can be false here because there is no write
+      // reactor that has an inlineable OnDone; this only applies to the
+      // DefaultReactor (which is unary).
+      this->MaybeDone(/*inlineable_ondone=*/false);
+    }
+    ~ServerCallbackWriterImpl() {
+      if (req_ != nullptr) {
+        req_->~RequestType();
+      }
     }
-    ~ServerCallbackWriterImpl() { req_->~RequestType(); }
 
     const RequestType* request() { return req_; }
 
-    void MaybeDone() override {
-      if (GPR_UNLIKELY(this->Unref() == 1)) {
-        reactor_.load(std::memory_order_relaxed)->OnDone();
-        grpc_call* call = call_.call();
-        auto call_requester = std::move(call_requester_);
-        this->~ServerCallbackWriterImpl();  // explicitly call destructor
-        ::grpc::g_core_codegen_interface->grpc_call_unref(call);
-        call_requester();
+    void CallOnDone() override {
+      reactor_.load(std::memory_order_relaxed)->OnDone();
+      grpc_call* call = call_.call();
+      auto call_requester = std::move(call_requester_);
+      if (ctx_->context_allocator() != nullptr) {
+        ctx_->context_allocator()->Release(ctx_);
       }
+      this->~ServerCallbackWriterImpl();  // explicitly call destructor
+      ::grpc::g_core_codegen_interface->grpc_call_unref(call);
+      call_requester();
     }
 
     ServerReactor* reactor() override {
@@ -591,7 +659,7 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
         write_ops_;
     ::grpc::internal::CallbackWithSuccessTag write_tag_;
 
-    ::grpc_impl::CallbackServerContext* const ctx_;
+    ::grpc::CallbackServerContext* const ctx_;
     ::grpc::internal::Call call_;
     const RequestType* req_;
     std::function<void()> call_requester_;
@@ -608,7 +676,7 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
  public:
   explicit CallbackBidiHandler(
       std::function<ServerBidiReactor<RequestType, ResponseType>*(
-          ::grpc_impl::CallbackServerContext*)>
+          ::grpc::CallbackServerContext*)>
           get_reactor)
       : get_reactor_(std::move(get_reactor)) {}
   void RunHandler(const HandlerParameter& param) final {
@@ -617,18 +685,22 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
     auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
         param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
         ServerCallbackReaderWriterImpl(
-            static_cast<::grpc_impl::CallbackServerContext*>(
-                param.server_context),
-            param.call, std::move(param.call_requester));
+            static_cast<::grpc::CallbackServerContext*>(param.server_context),
+            param.call, param.call_requester);
+    // Inlineable OnDone can be false in the CompletionOp callback because there
+    // is no bidi reactor that has an inlineable OnDone; this only applies to
+    // the DefaultReactor (which is unary).
     param.server_context->BeginCompletionOp(
-        param.call, [stream](bool) { stream->MaybeDone(); }, stream);
+        param.call,
+        [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
+        stream);
 
     ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
     if (param.status.ok()) {
       reactor = ::grpc::internal::CatchingReactorGetter<
           ServerBidiReactor<RequestType, ResponseType>>(
-          get_reactor_, static_cast<::grpc_impl::CallbackServerContext*>(
-                            param.server_context));
+          get_reactor_,
+          static_cast<::grpc::CallbackServerContext*>(param.server_context));
     }
 
     if (reactor == nullptr) {
@@ -645,15 +717,25 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
 
  private:
   std::function<ServerBidiReactor<RequestType, ResponseType>*(
-      ::grpc_impl::CallbackServerContext*)>
+      ::grpc::CallbackServerContext*)>
       get_reactor_;
 
   class ServerCallbackReaderWriterImpl
       : public ServerCallbackReaderWriter<RequestType, ResponseType> {
    public:
     void Finish(::grpc::Status s) override {
-      finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
-                      false);
+      // A finish tag with only MaybeDone can have its callback inlined
+      // regardless even if OnDone is not inlineable because this callback just
+      // checks a ref and then decides whether or not to dispatch OnDone.
+      finish_tag_.Set(
+          call_.call(),
+          [this](bool) {
+            // Inlineable OnDone can be false here because there is
+            // no bidi reactor that has an inlineable OnDone; this
+            // only applies to the DefaultReactor (which is unary).
+            this->MaybeDone(/*inlineable_ondone=*/false);
+          },
+          &finish_ops_, /*can_inline=*/true);
       finish_ops_.set_core_cq_tag(&finish_tag_);
 
       if (!ctx_->sent_initial_metadata_) {
@@ -671,13 +753,18 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
     void SendInitialMetadata() override {
       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
       this->Ref();
-      meta_tag_.Set(call_.call(),
-                    [this](bool ok) {
-                      reactor_.load(std::memory_order_relaxed)
-                          ->OnSendInitialMetadataDone(ok);
-                      MaybeDone();
-                    },
-                    &meta_ops_, false);
+      // The callback for this function should not be inlined because it invokes
+      // a user-controlled reaction, but any resulting OnDone can be inlined in
+      // the executor to which this callback is dispatched.
+      meta_tag_.Set(
+          call_.call(),
+          [this](bool ok) {
+            ServerBidiReactor<RequestType, ResponseType>* reactor =
+                reactor_.load(std::memory_order_relaxed);
+            reactor->OnSendInitialMetadataDone(ok);
+            this->MaybeDone(/*inlineable_ondone=*/true);
+          },
+          &meta_ops_, /*can_inline=*/false);
       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
                                     ctx_->initial_metadata_flags());
       if (ctx_->compression_level_set()) {
@@ -709,11 +796,8 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
 
     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
                         ::grpc::Status s) override {
-      // Don't send any message if the status is bad
-      if (s.ok()) {
-        // TODO(vjpai): don't assert
-        GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
-      }
+      // TODO(vjpai): don't assert
+      GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
       Finish(std::move(s));
     }
 
@@ -726,42 +810,50 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
    private:
     friend class CallbackBidiHandler<RequestType, ResponseType>;
 
-    ServerCallbackReaderWriterImpl(::grpc_impl::CallbackServerContext* ctx,
+    ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
                                    ::grpc::internal::Call* call,
                                    std::function<void()> call_requester)
         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
 
     void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
       reactor_.store(reactor, std::memory_order_relaxed);
+      // The callbacks for these functions should not be inlined because they
+      // invoke user-controlled reactions, but any resulting OnDones can be
+      // inlined in the executor to which a callback is dispatched.
       write_tag_.Set(
           call_.call(),
-          [this](bool ok) {
-            reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
-            MaybeDone();
+          [this, reactor](bool ok) {
+            reactor->OnWriteDone(ok);
+            this->MaybeDone(/*inlineable_ondone=*/true);
           },
-          &write_ops_, false);
+          &write_ops_, /*can_inline=*/false);
       write_ops_.set_core_cq_tag(&write_tag_);
-      read_tag_.Set(call_.call(),
-                    [this](bool ok) {
-                      reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
-                      MaybeDone();
-                    },
-                    &read_ops_, false);
+      read_tag_.Set(
+          call_.call(),
+          [this, reactor](bool ok) {
+            reactor->OnReadDone(ok);
+            this->MaybeDone(/*inlineable_ondone=*/true);
+          },
+          &read_ops_, /*can_inline=*/false);
       read_ops_.set_core_cq_tag(&read_tag_);
       this->BindReactor(reactor);
       this->MaybeCallOnCancel(reactor);
-      this->MaybeDone();
-    }
-
-    void MaybeDone() override {
-      if (GPR_UNLIKELY(this->Unref() == 1)) {
-        reactor_.load(std::memory_order_relaxed)->OnDone();
-        grpc_call* call = call_.call();
-        auto call_requester = std::move(call_requester_);
-        this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
-        ::grpc::g_core_codegen_interface->grpc_call_unref(call);
-        call_requester();
+      // Inlineable OnDone can be false here because there is no bidi
+      // reactor that has an inlineable OnDone; this only applies to the
+      // DefaultReactor (which is unary).
+      this->MaybeDone(/*inlineable_ondone=*/false);
+    }
+
+    void CallOnDone() override {
+      reactor_.load(std::memory_order_relaxed)->OnDone();
+      grpc_call* call = call_.call();
+      auto call_requester = std::move(call_requester_);
+      if (ctx_->context_allocator() != nullptr) {
+        ctx_->context_allocator()->Release(ctx_);
       }
+      this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
+      ::grpc::g_core_codegen_interface->grpc_call_unref(call);
+      call_requester();
     }
 
     ServerReactor* reactor() override {
@@ -785,7 +877,7 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
         read_ops_;
     ::grpc::internal::CallbackWithSuccessTag read_tag_;
 
-    ::grpc_impl::CallbackServerContext* const ctx_;
+    ::grpc::CallbackServerContext* const ctx_;
     ::grpc::internal::Call call_;
     std::function<void()> call_requester_;
     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
@@ -797,6 +889,6 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
 };
 
 }  // namespace internal
-}  // namespace grpc_impl
+}  // namespace grpc
 
 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H