3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpcpp/server_context.h>
20 #include <grpcpp/support/server_callback.h>
26 #include <grpc/compression.h>
27 #include <grpc/grpc.h>
28 #include <grpc/load_reporting.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpcpp/completion_queue.h>
32 #include <grpcpp/impl/call.h>
33 #include <grpcpp/support/time.h>
35 #include "src/core/lib/gprpp/ref_counted.h"
36 #include "src/core/lib/surface/call.h"
42 class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
44 // initial refs: one in the server context, one in the cq
45 // must ref the call before calling constructor and after deleting this
46 CompletionOp(internal::Call* call, internal::ServerReactor* reactor)
55 done_intercepting_(false) {}
57 // CompletionOp isn't copyable or movable
58 CompletionOp(const CompletionOp&) = delete;
59 CompletionOp& operator=(const CompletionOp&) = delete;
60 CompletionOp(CompletionOp&&) = delete;
61 CompletionOp& operator=(CompletionOp&&) = delete;
64 if (call_.server_rpc_info()) {
65 call_.server_rpc_info()->Unref();
69 void FillOps(internal::Call* call) override;
71 // This should always be arena allocated in the call, so override delete.
72 // But this class is not trivially destructible, so must actually call delete
73 // before allowing the arena to be freed
74 static void operator delete(void* ptr, std::size_t size) {
75 assert(size == sizeof(CompletionOp));
78 // This operator should never be called as the memory should be freed as part
79 // of the arena destruction. It only exists to provide a matching operator
80 // delete to the operator new so that some compilers will not complain (see
81 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
82 // there are no tests catching the compiler warning.
83 static void operator delete(void*, void*) { assert(0); }
85 bool FinalizeResult(void** tag, bool* status) override;
87 bool CheckCancelled(CompletionQueue* cq) {
89 return CheckCancelledNoPluck();
91 bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
93 void set_tag(void* tag) {
98 void SetCancelCallback(std::function<void()> callback) {
99 std::lock_guard<std::mutex> lock(mu_);
101 if (finalized_ && (cancelled_ != 0)) {
106 cancel_callback_ = std::move(callback);
109 void ClearCancelCallback() {
110 std::lock_guard<std::mutex> g(mu_);
111 cancel_callback_ = nullptr;
114 void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
116 void* core_cq_tag() override { return core_cq_tag_; }
120 // This will be called while interceptors are run if the RPC is a hijacked
121 // RPC. This should set hijacking state for each of the ops.
122 void SetHijackingState() override {
123 /* Servers don't allow hijacking */
124 GPR_CODEGEN_ASSERT(false);
127 /* Should be called after interceptors are done running */
128 void ContinueFillOpsAfterInterception() override {}
130 /* Should be called after interceptors are done running on the finalize result
132 void ContinueFinalizeResultAfterInterception() override {
133 done_intercepting_ = true;
135 /* We don't have a tag to return. */
139 /* Start a dummy op so that we can return the tag */
142 grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr));
146 bool CheckCancelledNoPluck() {
147 std::lock_guard<std::mutex> g(mu_);
148 return finalized_ ? (cancelled_ != 0) : false;
151 internal::Call call_;
152 internal::ServerReactor* const reactor_;
156 grpc_core::RefCount refs_;
159 int cancelled_; // This is an int (not bool) because it is passed to core
160 std::function<void()> cancel_callback_;
161 bool done_intercepting_;
162 internal::InterceptorBatchMethodsImpl interceptor_methods_;
165 void ServerContext::CompletionOp::Unref() {
167 grpc_call* call = call_.call();
169 grpc_call_unref(call);
173 void ServerContext::CompletionOp::FillOps(internal::Call* call) {
175 ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
176 ops.data.recv_close_on_server.cancelled = &cancelled_;
178 ops.reserved = nullptr;
179 interceptor_methods_.SetCall(&call_);
180 interceptor_methods_.SetReverse();
181 interceptor_methods_.SetCallOpSetInterface(this);
182 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1,
183 core_cq_tag_, nullptr));
184 /* No interceptors to run here */
187 bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
189 std::unique_lock<std::mutex> lock(mu_);
190 if (done_intercepting_) {
191 /* We are done intercepting. */
201 // If for some reason the incoming status is false, mark that as a
203 // TODO(vjpai): does this ever happen?
208 // Decide whether to call the cancel callback before releasing the lock
209 bool call_cancel = (cancelled_ != 0);
211 // If it's a unary cancel callback, call it under the lock so that it doesn't
212 // race with ClearCancelCallback
213 if (cancel_callback_) {
217 // Release the lock since we are going to be calling a callback and
221 if (call_cancel && reactor_ != nullptr) {
222 reactor_->OnCancel();
225 /* Add interception point and run through interceptors */
226 interceptor_methods_.AddInterceptionHookPoint(
227 experimental::InterceptionHookPoints::POST_RECV_CLOSE);
228 if (interceptor_methods_.RunInterceptors()) {
229 /* No interceptors were run */
237 /* There are interceptors to be run. Return false for now */
241 // ServerContext body
243 ServerContext::ServerContext() { Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); }
245 ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) {
247 std::swap(*client_metadata_.arr(), *arr);
250 void ServerContext::Setup(gpr_timespec deadline) {
251 completion_op_ = nullptr;
252 has_notify_when_done_tag_ = false;
253 async_notify_when_done_tag_ = nullptr;
254 deadline_ = deadline;
257 sent_initial_metadata_ = false;
258 compression_level_set_ = false;
259 has_pending_ops_ = false;
263 void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline,
264 grpc_metadata_array* arr) {
265 deadline_ = deadline;
266 std::swap(*client_metadata_.arr(), *arr);
269 ServerContext::~ServerContext() { Clear(); }
271 void ServerContext::Clear() {
272 auth_context_.reset();
273 initial_metadata_.clear();
274 trailing_metadata_.clear();
275 client_metadata_.Reset();
276 if (completion_op_) {
277 completion_op_->Unref();
278 completion_op_ = nullptr;
279 completion_tag_.Clear();
288 grpc_call_unref(call);
292 void ServerContext::BeginCompletionOp(internal::Call* call,
293 std::function<void(bool)> callback,
294 internal::ServerReactor* reactor) {
295 GPR_ASSERT(!completion_op_);
299 grpc_call_ref(call->call());
301 new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
302 CompletionOp(call, reactor);
303 if (callback != nullptr) {
304 completion_tag_.Set(call->call(), std::move(callback), completion_op_);
305 completion_op_->set_core_cq_tag(&completion_tag_);
306 completion_op_->set_tag(completion_op_);
307 } else if (has_notify_when_done_tag_) {
308 completion_op_->set_tag(async_notify_when_done_tag_);
310 call->PerformOps(completion_op_);
313 internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
314 return static_cast<internal::CompletionQueueTag*>(completion_op_);
317 void ServerContext::AddInitialMetadata(const grpc::string& key,
318 const grpc::string& value) {
319 initial_metadata_.insert(std::make_pair(key, value));
322 void ServerContext::AddTrailingMetadata(const grpc::string& key,
323 const grpc::string& value) {
324 trailing_metadata_.insert(std::make_pair(key, value));
327 void ServerContext::TryCancel() const {
328 internal::CancelInterceptorBatchMethods cancel_methods;
330 for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
331 rpc_info_->RunInterceptor(&cancel_methods, i);
334 grpc_call_error err = grpc_call_cancel_with_status(
335 call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
336 if (err != GRPC_CALL_OK) {
337 gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
341 void ServerContext::SetCancelCallback(std::function<void()> callback) {
342 completion_op_->SetCancelCallback(std::move(callback));
345 void ServerContext::ClearCancelCallback() {
346 completion_op_->ClearCancelCallback();
349 bool ServerContext::IsCancelled() const {
350 if (completion_tag_) {
351 // When using callback API, this result is always valid.
352 return completion_op_->CheckCancelledAsync();
353 } else if (has_notify_when_done_tag_) {
354 // When using async API, the result is only valid
355 // if the tag has already been delivered at the completion queue
356 return completion_op_ && completion_op_->CheckCancelledAsync();
358 // when using sync API, the result is always valid
359 return completion_op_ && completion_op_->CheckCancelled(cq_);
363 void ServerContext::set_compression_algorithm(
364 grpc_compression_algorithm algorithm) {
365 compression_algorithm_ = algorithm;
366 const char* algorithm_name = nullptr;
367 if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
368 gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
372 GPR_ASSERT(algorithm_name != nullptr);
373 AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
376 grpc::string ServerContext::peer() const {
379 char* c_peer = grpc_call_get_peer(call_);
386 const struct census_context* ServerContext::census_context() const {
387 return grpc_census_call_get_context(call_);
390 void ServerContext::SetLoadReportingCosts(
391 const std::vector<grpc::string>& cost_data) {
392 if (call_ == nullptr) return;
393 for (const auto& cost_datum : cost_data) {
394 AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);