Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / src / cpp / server / server_context.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpcpp/server_context.h>
20 #include <grpcpp/support/server_callback.h>
21
22 #include <algorithm>
23 #include <mutex>
24 #include <utility>
25
26 #include <grpc/compression.h>
27 #include <grpc/grpc.h>
28 #include <grpc/load_reporting.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpcpp/completion_queue.h>
32 #include <grpcpp/impl/call.h>
33 #include <grpcpp/support/time.h>
34
35 #include "src/core/lib/gprpp/ref_counted.h"
36 #include "src/core/lib/gprpp/sync.h"
37 #include "src/core/lib/surface/call.h"
38
39 namespace grpc {
40
41 // CompletionOp
42
43 class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
44  public:
45   // initial refs: one in the server context, one in the cq
46   // must ref the call before calling constructor and after deleting this
47   CompletionOp(internal::Call* call, internal::ServerReactor* reactor)
48       : call_(*call),
49         reactor_(reactor),
50         has_tag_(false),
51         tag_(nullptr),
52         core_cq_tag_(this),
53         refs_(2),
54         finalized_(false),
55         cancelled_(0),
56         done_intercepting_(false) {}
57
58   // CompletionOp isn't copyable or movable
59   CompletionOp(const CompletionOp&) = delete;
60   CompletionOp& operator=(const CompletionOp&) = delete;
61   CompletionOp(CompletionOp&&) = delete;
62   CompletionOp& operator=(CompletionOp&&) = delete;
63
64   ~CompletionOp() {
65     if (call_.server_rpc_info()) {
66       call_.server_rpc_info()->Unref();
67     }
68   }
69
70   void FillOps(internal::Call* call) override;
71
72   // This should always be arena allocated in the call, so override delete.
73   // But this class is not trivially destructible, so must actually call delete
74   // before allowing the arena to be freed
75   static void operator delete(void* ptr, std::size_t size) {
76     assert(size == sizeof(CompletionOp));
77   }
78
79   // This operator should never be called as the memory should be freed as part
80   // of the arena destruction. It only exists to provide a matching operator
81   // delete to the operator new so that some compilers will not complain (see
82   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
83   // there are no tests catching the compiler warning.
84   static void operator delete(void*, void*) { assert(0); }
85
86   bool FinalizeResult(void** tag, bool* status) override;
87
88   bool CheckCancelled(CompletionQueue* cq) {
89     cq->TryPluck(this);
90     return CheckCancelledNoPluck();
91   }
92   bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
93
94   void set_tag(void* tag) {
95     has_tag_ = true;
96     tag_ = tag;
97   }
98
99   void SetCancelCallback(std::function<void()> callback) {
100     grpc_core::MutexLock lock(&mu_);
101
102     if (finalized_ && (cancelled_ != 0)) {
103       callback();
104       return;
105     }
106
107     cancel_callback_ = std::move(callback);
108   }
109
110   void ClearCancelCallback() {
111     grpc_core::MutexLock g(&mu_);
112     cancel_callback_ = nullptr;
113   }
114
115   void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
116
117   void* core_cq_tag() override { return core_cq_tag_; }
118
119   void Unref();
120
121   // This will be called while interceptors are run if the RPC is a hijacked
122   // RPC. This should set hijacking state for each of the ops.
123   void SetHijackingState() override {
124     /* Servers don't allow hijacking */
125     GPR_CODEGEN_ASSERT(false);
126   }
127
128   /* Should be called after interceptors are done running */
129   void ContinueFillOpsAfterInterception() override {}
130
131   /* Should be called after interceptors are done running on the finalize result
132    * path */
133   void ContinueFinalizeResultAfterInterception() override {
134     done_intercepting_ = true;
135     if (!has_tag_) {
136       /* We don't have a tag to return. */
137       Unref();
138       return;
139     }
140     /* Start a dummy op so that we can return the tag */
141     GPR_CODEGEN_ASSERT(
142         GRPC_CALL_OK ==
143         grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr));
144   }
145
146  private:
147   bool CheckCancelledNoPluck() {
148     grpc_core::MutexLock lock(&mu_);
149     return finalized_ ? (cancelled_ != 0) : false;
150   }
151
152   internal::Call call_;
153   internal::ServerReactor* const reactor_;
154   bool has_tag_;
155   void* tag_;
156   void* core_cq_tag_;
157   grpc_core::RefCount refs_;
158   grpc_core::Mutex mu_;
159   bool finalized_;
160   int cancelled_;  // This is an int (not bool) because it is passed to core
161   std::function<void()> cancel_callback_;
162   bool done_intercepting_;
163   internal::InterceptorBatchMethodsImpl interceptor_methods_;
164 };
165
166 void ServerContext::CompletionOp::Unref() {
167   if (refs_.Unref()) {
168     grpc_call* call = call_.call();
169     delete this;
170     grpc_call_unref(call);
171   }
172 }
173
174 void ServerContext::CompletionOp::FillOps(internal::Call* call) {
175   grpc_op ops;
176   ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
177   ops.data.recv_close_on_server.cancelled = &cancelled_;
178   ops.flags = 0;
179   ops.reserved = nullptr;
180   interceptor_methods_.SetCall(&call_);
181   interceptor_methods_.SetReverse();
182   interceptor_methods_.SetCallOpSetInterface(this);
183   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1,
184                                                    core_cq_tag_, nullptr));
185   /* No interceptors to run here */
186 }
187
188 bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
189   bool ret = false;
190   grpc_core::ReleasableMutexLock lock(&mu_);
191   if (done_intercepting_) {
192     /* We are done intercepting. */
193     if (has_tag_) {
194       *tag = tag_;
195       ret = true;
196     }
197     Unref();
198     return ret;
199   }
200   finalized_ = true;
201
202   // If for some reason the incoming status is false, mark that as a
203   // cancellation.
204   // TODO(vjpai): does this ever happen?
205   if (!*status) {
206     cancelled_ = 1;
207   }
208
209   // Decide whether to call the cancel callback before releasing the lock
210   bool call_cancel = (cancelled_ != 0);
211
212   // If it's a unary cancel callback, call it under the lock so that it doesn't
213   // race with ClearCancelCallback. Although we don't normally call callbacks
214   // under a lock, this is a special case since the user needs a guarantee that
215   // the callback won't issue or run after ClearCancelCallback has returned.
216   // This requirement imposes certain restrictions on the callback, documented
217   // in the API comments of SetCancelCallback.
218   if (cancel_callback_) {
219     cancel_callback_();
220   }
221
222   // Release the lock since we may call a callback and interceptors now.
223   lock.Unlock();
224
225   if (call_cancel && reactor_ != nullptr) {
226     reactor_->MaybeCallOnCancel();
227   }
228   /* Add interception point and run through interceptors */
229   interceptor_methods_.AddInterceptionHookPoint(
230       experimental::InterceptionHookPoints::POST_RECV_CLOSE);
231   if (interceptor_methods_.RunInterceptors()) {
232     /* No interceptors were run */
233     if (has_tag_) {
234       *tag = tag_;
235       ret = true;
236     }
237     Unref();
238     return ret;
239   }
240   /* There are interceptors to be run. Return false for now */
241   return false;
242 }
243
244 // ServerContext body
245
246 ServerContext::ServerContext() { Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); }
247
248 ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) {
249   Setup(deadline);
250   std::swap(*client_metadata_.arr(), *arr);
251 }
252
253 void ServerContext::Setup(gpr_timespec deadline) {
254   completion_op_ = nullptr;
255   has_notify_when_done_tag_ = false;
256   async_notify_when_done_tag_ = nullptr;
257   deadline_ = deadline;
258   call_ = nullptr;
259   cq_ = nullptr;
260   sent_initial_metadata_ = false;
261   compression_level_set_ = false;
262   has_pending_ops_ = false;
263   rpc_info_ = nullptr;
264 }
265
266 void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline,
267                                             grpc_metadata_array* arr) {
268   deadline_ = deadline;
269   std::swap(*client_metadata_.arr(), *arr);
270 }
271
272 ServerContext::~ServerContext() { Clear(); }
273
274 void ServerContext::Clear() {
275   auth_context_.reset();
276   initial_metadata_.clear();
277   trailing_metadata_.clear();
278   client_metadata_.Reset();
279   if (completion_op_) {
280     completion_op_->Unref();
281     completion_op_ = nullptr;
282     completion_tag_.Clear();
283   }
284   if (rpc_info_) {
285     rpc_info_->Unref();
286     rpc_info_ = nullptr;
287   }
288   if (call_) {
289     auto* call = call_;
290     call_ = nullptr;
291     grpc_call_unref(call);
292   }
293 }
294
295 void ServerContext::BeginCompletionOp(internal::Call* call,
296                                       std::function<void(bool)> callback,
297                                       internal::ServerReactor* reactor) {
298   GPR_ASSERT(!completion_op_);
299   if (rpc_info_) {
300     rpc_info_->Ref();
301   }
302   grpc_call_ref(call->call());
303   completion_op_ =
304       new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
305           CompletionOp(call, reactor);
306   if (callback != nullptr) {
307     completion_tag_.Set(call->call(), std::move(callback), completion_op_);
308     completion_op_->set_core_cq_tag(&completion_tag_);
309     completion_op_->set_tag(completion_op_);
310   } else if (has_notify_when_done_tag_) {
311     completion_op_->set_tag(async_notify_when_done_tag_);
312   }
313   call->PerformOps(completion_op_);
314 }
315
316 internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
317   return static_cast<internal::CompletionQueueTag*>(completion_op_);
318 }
319
320 void ServerContext::AddInitialMetadata(const grpc::string& key,
321                                        const grpc::string& value) {
322   initial_metadata_.insert(std::make_pair(key, value));
323 }
324
325 void ServerContext::AddTrailingMetadata(const grpc::string& key,
326                                         const grpc::string& value) {
327   trailing_metadata_.insert(std::make_pair(key, value));
328 }
329
330 void ServerContext::TryCancel() const {
331   internal::CancelInterceptorBatchMethods cancel_methods;
332   if (rpc_info_) {
333     for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
334       rpc_info_->RunInterceptor(&cancel_methods, i);
335     }
336   }
337   grpc_call_error err = grpc_call_cancel_with_status(
338       call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
339   if (err != GRPC_CALL_OK) {
340     gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
341   }
342 }
343
344 void ServerContext::SetCancelCallback(std::function<void()> callback) {
345   completion_op_->SetCancelCallback(std::move(callback));
346 }
347
348 void ServerContext::ClearCancelCallback() {
349   completion_op_->ClearCancelCallback();
350 }
351
352 bool ServerContext::IsCancelled() const {
353   if (completion_tag_) {
354     // When using callback API, this result is always valid.
355     return completion_op_->CheckCancelledAsync();
356   } else if (has_notify_when_done_tag_) {
357     // When using async API, the result is only valid
358     // if the tag has already been delivered at the completion queue
359     return completion_op_ && completion_op_->CheckCancelledAsync();
360   } else {
361     // when using sync API, the result is always valid
362     return completion_op_ && completion_op_->CheckCancelled(cq_);
363   }
364 }
365
366 void ServerContext::set_compression_algorithm(
367     grpc_compression_algorithm algorithm) {
368   compression_algorithm_ = algorithm;
369   const char* algorithm_name = nullptr;
370   if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
371     gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
372             algorithm);
373     abort();
374   }
375   GPR_ASSERT(algorithm_name != nullptr);
376   AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
377 }
378
379 grpc::string ServerContext::peer() const {
380   grpc::string peer;
381   if (call_) {
382     char* c_peer = grpc_call_get_peer(call_);
383     peer = c_peer;
384     gpr_free(c_peer);
385   }
386   return peer;
387 }
388
389 const struct census_context* ServerContext::census_context() const {
390   return grpc_census_call_get_context(call_);
391 }
392
393 void ServerContext::SetLoadReportingCosts(
394     const std::vector<grpc::string>& cost_data) {
395   if (call_ == nullptr) return;
396   for (const auto& cost_datum : cost_data) {
397     AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
398   }
399 }
400
401 }  // namespace grpc