3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpcpp/server_context.h>
20 #include <grpcpp/support/server_callback.h>
26 #include <grpc/compression.h>
27 #include <grpc/grpc.h>
28 #include <grpc/load_reporting.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpcpp/completion_queue.h>
32 #include <grpcpp/impl/call.h>
33 #include <grpcpp/support/time.h>
35 #include "src/core/lib/gprpp/ref_counted.h"
36 #include "src/core/lib/gprpp/sync.h"
37 #include "src/core/lib/surface/call.h"
43 class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
45 // initial refs: one in the server context, one in the cq
46 // must ref the call before calling constructor and after deleting this
47 CompletionOp(internal::Call* call, internal::ServerReactor* reactor)
56 done_intercepting_(false) {}
58 // CompletionOp isn't copyable or movable
59 CompletionOp(const CompletionOp&) = delete;
60 CompletionOp& operator=(const CompletionOp&) = delete;
61 CompletionOp(CompletionOp&&) = delete;
62 CompletionOp& operator=(CompletionOp&&) = delete;
65 if (call_.server_rpc_info()) {
66 call_.server_rpc_info()->Unref();
70 void FillOps(internal::Call* call) override;
72 // This should always be arena allocated in the call, so override delete.
73 // But this class is not trivially destructible, so must actually call delete
74 // before allowing the arena to be freed
75 static void operator delete(void* ptr, std::size_t size) {
76 assert(size == sizeof(CompletionOp));
79 // This operator should never be called as the memory should be freed as part
80 // of the arena destruction. It only exists to provide a matching operator
81 // delete to the operator new so that some compilers will not complain (see
82 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
83 // there are no tests catching the compiler warning.
84 static void operator delete(void*, void*) { assert(0); }
86 bool FinalizeResult(void** tag, bool* status) override;
88 bool CheckCancelled(CompletionQueue* cq) {
90 return CheckCancelledNoPluck();
92 bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
94 void set_tag(void* tag) {
99 void SetCancelCallback(std::function<void()> callback) {
100 grpc_core::MutexLock lock(&mu_);
102 if (finalized_ && (cancelled_ != 0)) {
107 cancel_callback_ = std::move(callback);
110 void ClearCancelCallback() {
111 grpc_core::MutexLock g(&mu_);
112 cancel_callback_ = nullptr;
115 void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
117 void* core_cq_tag() override { return core_cq_tag_; }
121 // This will be called while interceptors are run if the RPC is a hijacked
122 // RPC. This should set hijacking state for each of the ops.
123 void SetHijackingState() override {
124 /* Servers don't allow hijacking */
125 GPR_CODEGEN_ASSERT(false);
128 /* Should be called after interceptors are done running */
129 void ContinueFillOpsAfterInterception() override {}
131 /* Should be called after interceptors are done running on the finalize result
133 void ContinueFinalizeResultAfterInterception() override {
134 done_intercepting_ = true;
136 /* We don't have a tag to return. */
140 /* Start a dummy op so that we can return the tag */
143 grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr));
147 bool CheckCancelledNoPluck() {
148 grpc_core::MutexLock lock(&mu_);
149 return finalized_ ? (cancelled_ != 0) : false;
152 internal::Call call_;
153 internal::ServerReactor* const reactor_;
157 grpc_core::RefCount refs_;
158 grpc_core::Mutex mu_;
160 int cancelled_; // This is an int (not bool) because it is passed to core
161 std::function<void()> cancel_callback_;
162 bool done_intercepting_;
163 internal::InterceptorBatchMethodsImpl interceptor_methods_;
166 void ServerContext::CompletionOp::Unref() {
168 grpc_call* call = call_.call();
170 grpc_call_unref(call);
174 void ServerContext::CompletionOp::FillOps(internal::Call* call) {
176 ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
177 ops.data.recv_close_on_server.cancelled = &cancelled_;
179 ops.reserved = nullptr;
180 interceptor_methods_.SetCall(&call_);
181 interceptor_methods_.SetReverse();
182 interceptor_methods_.SetCallOpSetInterface(this);
183 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1,
184 core_cq_tag_, nullptr));
185 /* No interceptors to run here */
188 bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
190 grpc_core::ReleasableMutexLock lock(&mu_);
191 if (done_intercepting_) {
192 /* We are done intercepting. */
202 // If for some reason the incoming status is false, mark that as a
204 // TODO(vjpai): does this ever happen?
209 // Decide whether to call the cancel callback before releasing the lock
210 bool call_cancel = (cancelled_ != 0);
212 // If it's a unary cancel callback, call it under the lock so that it doesn't
213 // race with ClearCancelCallback. Although we don't normally call callbacks
214 // under a lock, this is a special case since the user needs a guarantee that
215 // the callback won't issue or run after ClearCancelCallback has returned.
216 // This requirement imposes certain restrictions on the callback, documented
217 // in the API comments of SetCancelCallback.
218 if (cancel_callback_) {
222 // Release the lock since we may call a callback and interceptors now.
225 if (call_cancel && reactor_ != nullptr) {
226 reactor_->MaybeCallOnCancel();
228 /* Add interception point and run through interceptors */
229 interceptor_methods_.AddInterceptionHookPoint(
230 experimental::InterceptionHookPoints::POST_RECV_CLOSE);
231 if (interceptor_methods_.RunInterceptors()) {
232 /* No interceptors were run */
240 /* There are interceptors to be run. Return false for now */
244 // ServerContext body
246 ServerContext::ServerContext() { Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); }
248 ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) {
250 std::swap(*client_metadata_.arr(), *arr);
253 void ServerContext::Setup(gpr_timespec deadline) {
254 completion_op_ = nullptr;
255 has_notify_when_done_tag_ = false;
256 async_notify_when_done_tag_ = nullptr;
257 deadline_ = deadline;
260 sent_initial_metadata_ = false;
261 compression_level_set_ = false;
262 has_pending_ops_ = false;
266 void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline,
267 grpc_metadata_array* arr) {
268 deadline_ = deadline;
269 std::swap(*client_metadata_.arr(), *arr);
272 ServerContext::~ServerContext() { Clear(); }
274 void ServerContext::Clear() {
275 auth_context_.reset();
276 initial_metadata_.clear();
277 trailing_metadata_.clear();
278 client_metadata_.Reset();
279 if (completion_op_) {
280 completion_op_->Unref();
281 completion_op_ = nullptr;
282 completion_tag_.Clear();
291 grpc_call_unref(call);
295 void ServerContext::BeginCompletionOp(internal::Call* call,
296 std::function<void(bool)> callback,
297 internal::ServerReactor* reactor) {
298 GPR_ASSERT(!completion_op_);
302 grpc_call_ref(call->call());
304 new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
305 CompletionOp(call, reactor);
306 if (callback != nullptr) {
307 completion_tag_.Set(call->call(), std::move(callback), completion_op_);
308 completion_op_->set_core_cq_tag(&completion_tag_);
309 completion_op_->set_tag(completion_op_);
310 } else if (has_notify_when_done_tag_) {
311 completion_op_->set_tag(async_notify_when_done_tag_);
313 call->PerformOps(completion_op_);
316 internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
317 return static_cast<internal::CompletionQueueTag*>(completion_op_);
320 void ServerContext::AddInitialMetadata(const grpc::string& key,
321 const grpc::string& value) {
322 initial_metadata_.insert(std::make_pair(key, value));
325 void ServerContext::AddTrailingMetadata(const grpc::string& key,
326 const grpc::string& value) {
327 trailing_metadata_.insert(std::make_pair(key, value));
330 void ServerContext::TryCancel() const {
331 internal::CancelInterceptorBatchMethods cancel_methods;
333 for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
334 rpc_info_->RunInterceptor(&cancel_methods, i);
337 grpc_call_error err = grpc_call_cancel_with_status(
338 call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
339 if (err != GRPC_CALL_OK) {
340 gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
344 void ServerContext::SetCancelCallback(std::function<void()> callback) {
345 completion_op_->SetCancelCallback(std::move(callback));
348 void ServerContext::ClearCancelCallback() {
349 completion_op_->ClearCancelCallback();
352 bool ServerContext::IsCancelled() const {
353 if (completion_tag_) {
354 // When using callback API, this result is always valid.
355 return completion_op_->CheckCancelledAsync();
356 } else if (has_notify_when_done_tag_) {
357 // When using async API, the result is only valid
358 // if the tag has already been delivered at the completion queue
359 return completion_op_ && completion_op_->CheckCancelledAsync();
361 // when using sync API, the result is always valid
362 return completion_op_ && completion_op_->CheckCancelled(cq_);
366 void ServerContext::set_compression_algorithm(
367 grpc_compression_algorithm algorithm) {
368 compression_algorithm_ = algorithm;
369 const char* algorithm_name = nullptr;
370 if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
371 gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
375 GPR_ASSERT(algorithm_name != nullptr);
376 AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
379 grpc::string ServerContext::peer() const {
382 char* c_peer = grpc_call_get_peer(call_);
389 const struct census_context* ServerContext::census_context() const {
390 return grpc_census_call_get_context(call_);
393 void ServerContext::SetLoadReportingCosts(
394 const std::vector<grpc::string>& cost_data) {
395 if (call_ == nullptr) return;
396 for (const auto& cost_datum : cost_data) {
397 AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);