*
*/
-#include <grpcpp/server_context.h>
-#include <grpcpp/support/server_callback.h>
+#include <grpcpp/impl/codegen/server_context.h>
#include <algorithm>
-#include <mutex>
#include <utility>
#include <grpc/compression.h>
#include <grpc/load_reporting.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpcpp/completion_queue.h>
#include <grpcpp/impl/call.h>
+#include <grpcpp/impl/codegen/completion_queue.h>
+#include <grpcpp/support/server_callback.h>
#include <grpcpp/support/time.h>
+#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/surface/call.h"
namespace grpc {
// CompletionOp
-class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
+class ServerContextBase::CompletionOp final
+ : public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
// must ref the call before calling constructor and after deleting this
- CompletionOp(internal::Call* call, internal::ServerReactor* reactor)
+ CompletionOp(internal::Call* call,
+ ::grpc::internal::ServerCallbackCall* callback_controller)
: call_(*call),
- reactor_(reactor),
+ callback_controller_(callback_controller),
has_tag_(false),
tag_(nullptr),
core_cq_tag_(this),
CompletionOp(CompletionOp&&) = delete;
CompletionOp& operator=(CompletionOp&&) = delete;
- ~CompletionOp() {
+ ~CompletionOp() override {
if (call_.server_rpc_info()) {
call_.server_rpc_info()->Unref();
}
// This should always be arena allocated in the call, so override delete.
// But this class is not trivially destructible, so must actually call delete
// before allowing the arena to be freed
- static void operator delete(void* ptr, std::size_t size) {
+ static void operator delete(void* /*ptr*/, std::size_t size) {
+ // Use size to avoid unused-parameter warning since assert seems to be
+ // compiled out and treated as unused in some gcc optimized versions.
+ (void)size;
assert(size == sizeof(CompletionOp));
}
// RPC. This should set hijacking state for each of the ops.
void SetHijackingState() override {
/* Servers don't allow hijacking */
- GPR_CODEGEN_ASSERT(false);
+ GPR_ASSERT(false);
}
/* Should be called after interceptors are done running */
done_intercepting_ = true;
if (!has_tag_) {
/* We don't have a tag to return. */
- std::unique_lock<std::mutex> lock(mu_);
- if (--refs_ == 0) {
- lock.unlock();
- grpc_call* call = call_.call();
- delete this;
- grpc_call_unref(call);
- }
+ Unref();
return;
}
/* Start a dummy op so that we can return the tag */
- GPR_CODEGEN_ASSERT(
- GRPC_CALL_OK ==
- grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr));
+ GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
+ nullptr) == GRPC_CALL_OK);
}
private:
bool CheckCancelledNoPluck() {
- std::lock_guard<std::mutex> g(mu_);
+ grpc_core::MutexLock lock(&mu_);
return finalized_ ? (cancelled_ != 0) : false;
}
internal::Call call_;
- internal::ServerReactor* reactor_;
+ ::grpc::internal::ServerCallbackCall* const callback_controller_;
bool has_tag_;
void* tag_;
void* core_cq_tag_;
- std::mutex mu_;
- int refs_;
+ grpc_core::RefCount refs_;
+ grpc_core::Mutex mu_;
bool finalized_;
int cancelled_; // This is an int (not bool) because it is passed to core
bool done_intercepting_;
internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
-void ServerContext::CompletionOp::Unref() {
- std::unique_lock<std::mutex> lock(mu_);
- if (--refs_ == 0) {
- lock.unlock();
+void ServerContextBase::CompletionOp::Unref() {
+ if (refs_.Unref()) {
grpc_call* call = call_.call();
delete this;
grpc_call_unref(call);
}
}
-void ServerContext::CompletionOp::FillOps(internal::Call* call) {
+void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
grpc_op ops;
ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops.data.recv_close_on_server.cancelled = &cancelled_;
interceptor_methods_.SetCall(&call_);
interceptor_methods_.SetReverse();
interceptor_methods_.SetCallOpSetInterface(this);
- GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1,
- core_cq_tag_, nullptr));
+ // The following call_start_batch is internally-generated so no need for an
+ // explanatory log on failure.
+ GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_,
+ nullptr) == GRPC_CALL_OK);
/* No interceptors to run here */
}
-bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
- bool ret = false;
- std::unique_lock<std::mutex> lock(mu_);
- if (done_intercepting_) {
- /* We are done intercepting. */
- if (has_tag_) {
- *tag = tag_;
- ret = true;
+bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
+ // Decide whether to call the cancel callback within the lock
+ bool call_cancel;
+
+ {
+ grpc_core::MutexLock lock(&mu_);
+ if (done_intercepting_) {
+ // We are done intercepting.
+ bool has_tag = has_tag_;
+ if (has_tag) {
+ *tag = tag_;
+ }
+ Unref();
+ return has_tag;
}
- if (--refs_ == 0) {
- lock.unlock();
- grpc_call* call = call_.call();
- delete this;
- grpc_call_unref(call);
+ finalized_ = true;
+
+ // If for some reason the incoming status is false, mark that as a
+ // cancellation.
+ // TODO(vjpai): does this ever happen?
+ if (!*status) {
+ cancelled_ = 1;
}
- return ret;
- }
- finalized_ = true;
- // If for some reason the incoming status is false, mark that as a
- // cancellation.
- // TODO(vjpai): does this ever happen?
- if (!*status) {
- cancelled_ = 1;
+ call_cancel = (cancelled_ != 0);
+ // Release the lock since we may call a callback and interceptors.
}
- if (cancelled_ && (reactor_ != nullptr)) {
- reactor_->OnCancel();
+ if (call_cancel && callback_controller_ != nullptr) {
+ callback_controller_->MaybeCallOnCancel();
}
- /* Release the lock since we are going to be running through interceptors now
- */
- lock.unlock();
/* Add interception point and run through interceptors */
interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_CLOSE);
if (interceptor_methods_.RunInterceptors()) {
- /* No interceptors were run */
- if (has_tag_) {
+ // No interceptors were run
+ bool has_tag = has_tag_;
+ if (has_tag) {
*tag = tag_;
- ret = true;
- }
- lock.lock();
- if (--refs_ == 0) {
- lock.unlock();
- grpc_call* call = call_.call();
- delete this;
- grpc_call_unref(call);
}
- return ret;
+ Unref();
+ return has_tag;
}
- /* There are interceptors to be run. Return false for now */
+ // There are interceptors to be run. Return false for now.
return false;
}
-// ServerContext body
+// ServerContextBase body
-ServerContext::ServerContext() { Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); }
+ServerContextBase::ServerContextBase()
+ : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
-ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) {
- Setup(deadline);
+ServerContextBase::ServerContextBase(gpr_timespec deadline,
+ grpc_metadata_array* arr)
+ : deadline_(deadline) {
std::swap(*client_metadata_.arr(), *arr);
}
-void ServerContext::Setup(gpr_timespec deadline) {
- completion_op_ = nullptr;
- has_notify_when_done_tag_ = false;
- async_notify_when_done_tag_ = nullptr;
- deadline_ = deadline;
- call_ = nullptr;
- cq_ = nullptr;
- sent_initial_metadata_ = false;
- compression_level_set_ = false;
- has_pending_ops_ = false;
- rpc_info_ = nullptr;
-}
-
-void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline,
- grpc_metadata_array* arr) {
+void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
+ grpc_metadata_array* arr) {
deadline_ = deadline;
std::swap(*client_metadata_.arr(), *arr);
}
-ServerContext::~ServerContext() { Clear(); }
-
-void ServerContext::Clear() {
- auth_context_.reset();
- initial_metadata_.clear();
- trailing_metadata_.clear();
- client_metadata_.Reset();
+ServerContextBase::~ServerContextBase() {
if (completion_op_) {
completion_op_->Unref();
- completion_op_ = nullptr;
- completion_tag_.Clear();
}
if (rpc_info_) {
rpc_info_->Unref();
- rpc_info_ = nullptr;
}
- if (call_) {
- auto* call = call_;
- call_ = nullptr;
+ if (default_reactor_used_.load(std::memory_order_relaxed)) {
+ reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
+ }
+}
+
+ServerContextBase::CallWrapper::~CallWrapper() {
+ if (call) {
+ // If the ServerContext is part of the call's arena, this could free the
+ // object itself.
grpc_call_unref(call);
}
}
-void ServerContext::BeginCompletionOp(internal::Call* call,
- std::function<void(bool)> callback,
- internal::ServerReactor* reactor) {
+void ServerContextBase::BeginCompletionOp(
+ internal::Call* call, std::function<void(bool)> callback,
+ ::grpc::internal::ServerCallbackCall* callback_controller) {
GPR_ASSERT(!completion_op_);
if (rpc_info_) {
rpc_info_->Ref();
grpc_call_ref(call->call());
completion_op_ =
new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
- CompletionOp(call, reactor);
- if (callback != nullptr) {
- completion_tag_.Set(call->call(), std::move(callback), completion_op_);
+ CompletionOp(call, callback_controller);
+ if (callback_controller != nullptr) {
+ completion_tag_.Set(call->call(), std::move(callback), completion_op_,
+ true);
completion_op_->set_core_cq_tag(&completion_tag_);
completion_op_->set_tag(completion_op_);
} else if (has_notify_when_done_tag_) {
call->PerformOps(completion_op_);
}
-internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
+internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
return static_cast<internal::CompletionQueueTag*>(completion_op_);
}
-void ServerContext::AddInitialMetadata(const grpc::string& key,
- const grpc::string& value) {
+void ServerContextBase::AddInitialMetadata(const std::string& key,
+ const std::string& value) {
initial_metadata_.insert(std::make_pair(key, value));
}
-void ServerContext::AddTrailingMetadata(const grpc::string& key,
- const grpc::string& value) {
+void ServerContextBase::AddTrailingMetadata(const std::string& key,
+ const std::string& value) {
trailing_metadata_.insert(std::make_pair(key, value));
}
-void ServerContext::TryCancel() const {
+void ServerContextBase::TryCancel() const {
internal::CancelInterceptorBatchMethods cancel_methods;
if (rpc_info_) {
for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
rpc_info_->RunInterceptor(&cancel_methods, i);
}
}
- grpc_call_error err = grpc_call_cancel_with_status(
- call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
+ grpc_call_error err =
+ grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
+ "Cancelled on the server side", nullptr);
if (err != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
}
}
-bool ServerContext::IsCancelled() const {
+bool ServerContextBase::IsCancelled() const {
if (completion_tag_) {
// When using callback API, this result is always valid.
return completion_op_->CheckCancelledAsync();
}
}
-void ServerContext::set_compression_algorithm(
+void ServerContextBase::set_compression_algorithm(
grpc_compression_algorithm algorithm) {
compression_algorithm_ = algorithm;
const char* algorithm_name = nullptr;
AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
}
-grpc::string ServerContext::peer() const {
- grpc::string peer;
- if (call_) {
- char* c_peer = grpc_call_get_peer(call_);
+std::string ServerContextBase::peer() const {
+ std::string peer;
+ if (call_.call) {
+ char* c_peer = grpc_call_get_peer(call_.call);
peer = c_peer;
gpr_free(c_peer);
}
return peer;
}
-const struct census_context* ServerContext::census_context() const {
- return grpc_census_call_get_context(call_);
+const struct census_context* ServerContextBase::census_context() const {
+ return call_.call == nullptr ? nullptr
+ : grpc_census_call_get_context(call_.call);
}
-void ServerContext::SetLoadReportingCosts(
- const std::vector<grpc::string>& cost_data) {
- if (call_ == nullptr) return;
+void ServerContextBase::SetLoadReportingCosts(
+ const std::vector<std::string>& cost_data) {
+ if (call_.call == nullptr) return;
for (const auto& cost_datum : cost_data) {
AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
}