Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / src / cpp / server / server_context.cc
index 1b524bc..f3cb004 100644 (file)
  *
  */
 
-#include <grpcpp/server_context.h>
-#include <grpcpp/support/server_callback.h>
+#include <grpcpp/impl/codegen/server_context.h>
 
 #include <algorithm>
-#include <mutex>
 #include <utility>
 
 #include <grpc/compression.h>
 #include <grpc/load_reporting.h>
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
-#include <grpcpp/completion_queue.h>
 #include <grpcpp/impl/call.h>
+#include <grpcpp/impl/codegen/completion_queue.h>
+#include <grpcpp/support/server_callback.h>
 #include <grpcpp/support/time.h>
 
+#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/gprpp/sync.h"
 #include "src/core/lib/surface/call.h"
 
 namespace grpc {
 
 // CompletionOp
 
-class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
+class ServerContextBase::CompletionOp final
+    : public internal::CallOpSetInterface {
  public:
   // initial refs: one in the server context, one in the cq
   // must ref the call before calling constructor and after deleting this
-  CompletionOp(internal::Call* call, internal::ServerReactor* reactor)
+  CompletionOp(internal::Call* call,
+               ::grpc::internal::ServerCallbackCall* callback_controller)
       : call_(*call),
-        reactor_(reactor),
+        callback_controller_(callback_controller),
         has_tag_(false),
         tag_(nullptr),
         core_cq_tag_(this),
@@ -59,7 +62,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
   CompletionOp(CompletionOp&&) = delete;
   CompletionOp& operator=(CompletionOp&&) = delete;
 
-  ~CompletionOp() {
+  ~CompletionOp() override {
     if (call_.server_rpc_info()) {
       call_.server_rpc_info()->Unref();
     }
@@ -70,7 +73,10 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
   // This should always be arena allocated in the call, so override delete.
   // But this class is not trivially destructible, so must actually call delete
   // before allowing the arena to be freed
-  static void operator delete(void* ptr, std::size_t size) {
+  static void operator delete(void* /*ptr*/, std::size_t size) {
+    // Use size to avoid unused-parameter warning since assert seems to be
+    // compiled out and treated as unused in some gcc optimized versions.
+    (void)size;
     assert(size == sizeof(CompletionOp));
   }
 
@@ -104,7 +110,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
   // RPC. This should set hijacking state for each of the ops.
   void SetHijackingState() override {
     /* Servers don't allow hijacking */
-    GPR_CODEGEN_ASSERT(false);
+    GPR_ASSERT(false);
   }
 
   /* Should be called after interceptors are done running */
@@ -116,51 +122,42 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
     done_intercepting_ = true;
     if (!has_tag_) {
       /* We don't have a tag to return. */
-      std::unique_lock<std::mutex> lock(mu_);
-      if (--refs_ == 0) {
-        lock.unlock();
-        grpc_call* call = call_.call();
-        delete this;
-        grpc_call_unref(call);
-      }
+      Unref();
       return;
     }
     /* Start a dummy op so that we can return the tag */
-    GPR_CODEGEN_ASSERT(
-        GRPC_CALL_OK ==
-        grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr));
+    GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
+                                     nullptr) == GRPC_CALL_OK);
   }
 
  private:
   bool CheckCancelledNoPluck() {
-    std::lock_guard<std::mutex> g(mu_);
+    grpc_core::MutexLock lock(&mu_);
     return finalized_ ? (cancelled_ != 0) : false;
   }
 
   internal::Call call_;
-  internal::ServerReactor* reactor_;
+  ::grpc::internal::ServerCallbackCall* const callback_controller_;
   bool has_tag_;
   void* tag_;
   void* core_cq_tag_;
-  std::mutex mu_;
-  int refs_;
+  grpc_core::RefCount refs_;
+  grpc_core::Mutex mu_;
   bool finalized_;
   int cancelled_;  // This is an int (not bool) because it is passed to core
   bool done_intercepting_;
   internal::InterceptorBatchMethodsImpl interceptor_methods_;
 };
 
-void ServerContext::CompletionOp::Unref() {
-  std::unique_lock<std::mutex> lock(mu_);
-  if (--refs_ == 0) {
-    lock.unlock();
+void ServerContextBase::CompletionOp::Unref() {
+  if (refs_.Unref()) {
     grpc_call* call = call_.call();
     delete this;
     grpc_call_unref(call);
   }
 }
 
-void ServerContext::CompletionOp::FillOps(internal::Call* call) {
+void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
   grpc_op ops;
   ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
   ops.data.recv_close_on_server.cancelled = &cancelled_;
@@ -169,119 +166,100 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) {
   interceptor_methods_.SetCall(&call_);
   interceptor_methods_.SetReverse();
   interceptor_methods_.SetCallOpSetInterface(this);
-  GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1,
-                                                   core_cq_tag_, nullptr));
+  // The following call_start_batch is internally-generated so no need for an
+  // explanatory log on failure.
+  GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_,
+                                   nullptr) == GRPC_CALL_OK);
   /* No interceptors to run here */
 }
 
-bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
-  bool ret = false;
-  std::unique_lock<std::mutex> lock(mu_);
-  if (done_intercepting_) {
-    /* We are done intercepting. */
-    if (has_tag_) {
-      *tag = tag_;
-      ret = true;
+bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
+  // Decide whether to call the cancel callback within the lock
+  bool call_cancel;
+
+  {
+    grpc_core::MutexLock lock(&mu_);
+    if (done_intercepting_) {
+      // We are done intercepting.
+      bool has_tag = has_tag_;
+      if (has_tag) {
+        *tag = tag_;
+      }
+      Unref();
+      return has_tag;
     }
-    if (--refs_ == 0) {
-      lock.unlock();
-      grpc_call* call = call_.call();
-      delete this;
-      grpc_call_unref(call);
+    finalized_ = true;
+
+    // If for some reason the incoming status is false, mark that as a
+    // cancellation.
+    // TODO(vjpai): does this ever happen?
+    if (!*status) {
+      cancelled_ = 1;
     }
-    return ret;
-  }
-  finalized_ = true;
 
-  // If for some reason the incoming status is false, mark that as a
-  // cancellation.
-  // TODO(vjpai): does this ever happen?
-  if (!*status) {
-    cancelled_ = 1;
+    call_cancel = (cancelled_ != 0);
+    // Release the lock since we may call a callback and interceptors.
   }
 
-  if (cancelled_ && (reactor_ != nullptr)) {
-    reactor_->OnCancel();
+  if (call_cancel && callback_controller_ != nullptr) {
+    callback_controller_->MaybeCallOnCancel();
   }
-  /* Release the lock since we are going to be running through interceptors now
-   */
-  lock.unlock();
   /* Add interception point and run through interceptors */
   interceptor_methods_.AddInterceptionHookPoint(
       experimental::InterceptionHookPoints::POST_RECV_CLOSE);
   if (interceptor_methods_.RunInterceptors()) {
-    /* No interceptors were run */
-    if (has_tag_) {
+    // No interceptors were run
+    bool has_tag = has_tag_;
+    if (has_tag) {
       *tag = tag_;
-      ret = true;
-    }
-    lock.lock();
-    if (--refs_ == 0) {
-      lock.unlock();
-      grpc_call* call = call_.call();
-      delete this;
-      grpc_call_unref(call);
     }
-    return ret;
+    Unref();
+    return has_tag;
   }
-  /* There are interceptors to be run. Return false for now */
+  // There are interceptors to be run. Return false for now.
   return false;
 }
 
-// ServerContext body
+// ServerContextBase body
 
-ServerContext::ServerContext() { Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); }
+ServerContextBase::ServerContextBase()
+    : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
 
-ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) {
-  Setup(deadline);
+ServerContextBase::ServerContextBase(gpr_timespec deadline,
+                                     grpc_metadata_array* arr)
+    : deadline_(deadline) {
   std::swap(*client_metadata_.arr(), *arr);
 }
 
-void ServerContext::Setup(gpr_timespec deadline) {
-  completion_op_ = nullptr;
-  has_notify_when_done_tag_ = false;
-  async_notify_when_done_tag_ = nullptr;
-  deadline_ = deadline;
-  call_ = nullptr;
-  cq_ = nullptr;
-  sent_initial_metadata_ = false;
-  compression_level_set_ = false;
-  has_pending_ops_ = false;
-  rpc_info_ = nullptr;
-}
-
-void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline,
-                                            grpc_metadata_array* arr) {
+void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
+                                                grpc_metadata_array* arr) {
   deadline_ = deadline;
   std::swap(*client_metadata_.arr(), *arr);
 }
 
-ServerContext::~ServerContext() { Clear(); }
-
-void ServerContext::Clear() {
-  auth_context_.reset();
-  initial_metadata_.clear();
-  trailing_metadata_.clear();
-  client_metadata_.Reset();
+ServerContextBase::~ServerContextBase() {
   if (completion_op_) {
     completion_op_->Unref();
-    completion_op_ = nullptr;
-    completion_tag_.Clear();
   }
   if (rpc_info_) {
     rpc_info_->Unref();
-    rpc_info_ = nullptr;
   }
-  if (call_) {
-    auto* call = call_;
-    call_ = nullptr;
+  if (default_reactor_used_.load(std::memory_order_relaxed)) {
+    reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
+  }
+}
+
+ServerContextBase::CallWrapper::~CallWrapper() {
+  if (call) {
+    // If the ServerContext is part of the call's arena, this could free the
+    // object itself.
     grpc_call_unref(call);
   }
 }
 
-void ServerContext::BeginCompletionOp(internal::Call* call,
-                                      std::function<void(bool)> callback,
-                                      internal::ServerReactor* reactor) {
+void ServerContextBase::BeginCompletionOp(
+    internal::Call* call, std::function<void(bool)> callback,
+    ::grpc::internal::ServerCallbackCall* callback_controller) {
   GPR_ASSERT(!completion_op_);
   if (rpc_info_) {
     rpc_info_->Ref();
@@ -289,9 +267,10 @@ void ServerContext::BeginCompletionOp(internal::Call* call,
   grpc_call_ref(call->call());
   completion_op_ =
       new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
-          CompletionOp(call, reactor);
-  if (callback != nullptr) {
-    completion_tag_.Set(call->call(), std::move(callback), completion_op_);
+          CompletionOp(call, callback_controller);
+  if (callback_controller != nullptr) {
+    completion_tag_.Set(call->call(), std::move(callback), completion_op_,
+                        true);
     completion_op_->set_core_cq_tag(&completion_tag_);
     completion_op_->set_tag(completion_op_);
   } else if (has_notify_when_done_tag_) {
@@ -300,35 +279,36 @@ void ServerContext::BeginCompletionOp(internal::Call* call,
   call->PerformOps(completion_op_);
 }
 
-internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
+internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
   return static_cast<internal::CompletionQueueTag*>(completion_op_);
 }
 
-void ServerContext::AddInitialMetadata(const grpc::string& key,
-                                       const grpc::string& value) {
+void ServerContextBase::AddInitialMetadata(const std::string& key,
+                                           const std::string& value) {
   initial_metadata_.insert(std::make_pair(key, value));
 }
 
-void ServerContext::AddTrailingMetadata(const grpc::string& key,
-                                        const grpc::string& value) {
+void ServerContextBase::AddTrailingMetadata(const std::string& key,
+                                            const std::string& value) {
   trailing_metadata_.insert(std::make_pair(key, value));
 }
 
-void ServerContext::TryCancel() const {
+void ServerContextBase::TryCancel() const {
   internal::CancelInterceptorBatchMethods cancel_methods;
   if (rpc_info_) {
     for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
       rpc_info_->RunInterceptor(&cancel_methods, i);
     }
   }
-  grpc_call_error err = grpc_call_cancel_with_status(
-      call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
+  grpc_call_error err =
+      grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
+                                   "Cancelled on the server side", nullptr);
   if (err != GRPC_CALL_OK) {
     gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
   }
 }
 
-bool ServerContext::IsCancelled() const {
+bool ServerContextBase::IsCancelled() const {
   if (completion_tag_) {
     // When using callback API, this result is always valid.
     return completion_op_->CheckCancelledAsync();
@@ -342,7 +322,7 @@ bool ServerContext::IsCancelled() const {
   }
 }
 
-void ServerContext::set_compression_algorithm(
+void ServerContextBase::set_compression_algorithm(
     grpc_compression_algorithm algorithm) {
   compression_algorithm_ = algorithm;
   const char* algorithm_name = nullptr;
@@ -355,23 +335,24 @@ void ServerContext::set_compression_algorithm(
   AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
 }
 
-grpc::string ServerContext::peer() const {
-  grpc::string peer;
-  if (call_) {
-    char* c_peer = grpc_call_get_peer(call_);
+std::string ServerContextBase::peer() const {
+  std::string peer;
+  if (call_.call) {
+    char* c_peer = grpc_call_get_peer(call_.call);
     peer = c_peer;
     gpr_free(c_peer);
   }
   return peer;
 }
 
-const struct census_context* ServerContext::census_context() const {
-  return grpc_census_call_get_context(call_);
+const struct census_context* ServerContextBase::census_context() const {
+  return call_.call == nullptr ? nullptr
+                               : grpc_census_call_get_context(call_.call);
 }
 
-void ServerContext::SetLoadReportingCosts(
-    const std::vector<grpc::string>& cost_data) {
-  if (call_ == nullptr) return;
+void ServerContextBase::SetLoadReportingCosts(
+    const std::vector<std::string>& cost_data) {
+  if (call_.call == nullptr) return;
   for (const auto& cost_datum : cost_data) {
     AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
   }