Imported Upstream version 1.28.0
[platform/upstream/grpc.git] / src / cpp / server / server_context.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpcpp/impl/codegen/server_context_impl.h>
20
21 #include <algorithm>
22 #include <utility>
23
24 #include <grpc/compression.h>
25 #include <grpc/grpc.h>
26 #include <grpc/load_reporting.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpcpp/impl/call.h>
30 #include <grpcpp/impl/codegen/completion_queue_impl.h>
31 #include <grpcpp/support/server_callback.h>
32 #include <grpcpp/support/time.h>
33
34 #include "src/core/lib/gprpp/ref_counted.h"
35 #include "src/core/lib/gprpp/sync.h"
36 #include "src/core/lib/surface/call.h"
37
38 namespace grpc_impl {
39
40 // CompletionOp
41
42 class ServerContextBase::CompletionOp final
43     : public ::grpc::internal::CallOpSetInterface {
44  public:
45   // initial refs: one in the server context, one in the cq
46   // must ref the call before calling constructor and after deleting this
47   CompletionOp(::grpc::internal::Call* call,
48                ::grpc_impl::internal::ServerCallbackCall* callback_controller)
49       : call_(*call),
50         callback_controller_(callback_controller),
51         has_tag_(false),
52         tag_(nullptr),
53         core_cq_tag_(this),
54         refs_(2),
55         finalized_(false),
56         cancelled_(0),
57         done_intercepting_(false) {}
58
59   // CompletionOp isn't copyable or movable
60   CompletionOp(const CompletionOp&) = delete;
61   CompletionOp& operator=(const CompletionOp&) = delete;
62   CompletionOp(CompletionOp&&) = delete;
63   CompletionOp& operator=(CompletionOp&&) = delete;
64
65   ~CompletionOp() {
66     if (call_.server_rpc_info()) {
67       call_.server_rpc_info()->Unref();
68     }
69   }
70
71   void FillOps(::grpc::internal::Call* call) override;
72
73   // This should always be arena allocated in the call, so override delete.
74   // But this class is not trivially destructible, so must actually call delete
75   // before allowing the arena to be freed
76   static void operator delete(void* /*ptr*/, std::size_t size) {
77     // Use size to avoid unused-parameter warning since assert seems to be
78     // compiled out and treated as unused in some gcc optimized versions.
79     (void)size;
80     assert(size == sizeof(CompletionOp));
81   }
82
83   // This operator should never be called as the memory should be freed as part
84   // of the arena destruction. It only exists to provide a matching operator
85   // delete to the operator new so that some compilers will not complain (see
86   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
87   // there are no tests catching the compiler warning.
88   static void operator delete(void*, void*) { assert(0); }
89
90   bool FinalizeResult(void** tag, bool* status) override;
91
92   bool CheckCancelled(CompletionQueue* cq) {
93     cq->TryPluck(this);
94     return CheckCancelledNoPluck();
95   }
96   bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
97
98   void set_tag(void* tag) {
99     has_tag_ = true;
100     tag_ = tag;
101   }
102
103   void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
104
105   void* core_cq_tag() override { return core_cq_tag_; }
106
107   void Unref();
108
109   // This will be called while interceptors are run if the RPC is a hijacked
110   // RPC. This should set hijacking state for each of the ops.
111   void SetHijackingState() override {
112     /* Servers don't allow hijacking */
113     GPR_ASSERT(false);
114   }
115
116   /* Should be called after interceptors are done running */
117   void ContinueFillOpsAfterInterception() override {}
118
119   /* Should be called after interceptors are done running on the finalize result
120    * path */
121   void ContinueFinalizeResultAfterInterception() override {
122     done_intercepting_ = true;
123     if (!has_tag_) {
124       /* We don't have a tag to return. */
125       Unref();
126       return;
127     }
128     /* Start a dummy op so that we can return the tag */
129     GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
130                                      nullptr) == GRPC_CALL_OK);
131   }
132
133  private:
134   bool CheckCancelledNoPluck() {
135     grpc_core::MutexLock lock(&mu_);
136     return finalized_ ? (cancelled_ != 0) : false;
137   }
138
139   ::grpc::internal::Call call_;
140   ::grpc_impl::internal::ServerCallbackCall* const callback_controller_;
141   bool has_tag_;
142   void* tag_;
143   void* core_cq_tag_;
144   grpc_core::RefCount refs_;
145   grpc_core::Mutex mu_;
146   bool finalized_;
147   int cancelled_;  // This is an int (not bool) because it is passed to core
148   bool done_intercepting_;
149   ::grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
150 };
151
152 void ServerContextBase::CompletionOp::Unref() {
153   if (refs_.Unref()) {
154     grpc_call* call = call_.call();
155     delete this;
156     grpc_call_unref(call);
157   }
158 }
159
160 void ServerContextBase::CompletionOp::FillOps(::grpc::internal::Call* call) {
161   grpc_op ops;
162   ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
163   ops.data.recv_close_on_server.cancelled = &cancelled_;
164   ops.flags = 0;
165   ops.reserved = nullptr;
166   interceptor_methods_.SetCall(&call_);
167   interceptor_methods_.SetReverse();
168   interceptor_methods_.SetCallOpSetInterface(this);
169   // The following call_start_batch is internally-generated so no need for an
170   // explanatory log on failure.
171   GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_,
172                                    nullptr) == GRPC_CALL_OK);
173   /* No interceptors to run here */
174 }
175
176 bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
177   bool ret = false;
178   grpc_core::ReleasableMutexLock lock(&mu_);
179   if (done_intercepting_) {
180     /* We are done intercepting. */
181     if (has_tag_) {
182       *tag = tag_;
183       ret = true;
184     }
185     Unref();
186     return ret;
187   }
188   finalized_ = true;
189
190   // If for some reason the incoming status is false, mark that as a
191   // cancellation.
192   // TODO(vjpai): does this ever happen?
193   if (!*status) {
194     cancelled_ = 1;
195   }
196
197   // Decide whether to call the cancel callback before releasing the lock
198   bool call_cancel = (cancelled_ != 0);
199
200   // Release the lock since we may call a callback and interceptors now.
201   lock.Unlock();
202
203   if (call_cancel && callback_controller_ != nullptr) {
204     callback_controller_->MaybeCallOnCancel();
205   }
206   /* Add interception point and run through interceptors */
207   interceptor_methods_.AddInterceptionHookPoint(
208       ::grpc::experimental::InterceptionHookPoints::POST_RECV_CLOSE);
209   if (interceptor_methods_.RunInterceptors()) {
210     /* No interceptors were run */
211     if (has_tag_) {
212       *tag = tag_;
213       ret = true;
214     }
215     Unref();
216     return ret;
217   }
218   /* There are interceptors to be run. Return false for now */
219   return false;
220 }
221
222 // ServerContextBase body
223
224 ServerContextBase::ServerContextBase() {
225   Setup(gpr_inf_future(GPR_CLOCK_REALTIME));
226 }
227
228 ServerContextBase::ServerContextBase(gpr_timespec deadline,
229                                      grpc_metadata_array* arr) {
230   Setup(deadline);
231   std::swap(*client_metadata_.arr(), *arr);
232 }
233
234 void ServerContextBase::Setup(gpr_timespec deadline) {
235   completion_op_ = nullptr;
236   has_notify_when_done_tag_ = false;
237   async_notify_when_done_tag_ = nullptr;
238   deadline_ = deadline;
239   call_ = nullptr;
240   cq_ = nullptr;
241   sent_initial_metadata_ = false;
242   compression_level_set_ = false;
243   has_pending_ops_ = false;
244   rpc_info_ = nullptr;
245 }
246
247 void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
248                                                 grpc_metadata_array* arr) {
249   deadline_ = deadline;
250   std::swap(*client_metadata_.arr(), *arr);
251 }
252
253 ServerContextBase::~ServerContextBase() { Clear(); }
254
255 void ServerContextBase::Clear() {
256   auth_context_.reset();
257   initial_metadata_.clear();
258   trailing_metadata_.clear();
259   client_metadata_.Reset();
260   if (completion_op_) {
261     completion_op_->Unref();
262     completion_op_ = nullptr;
263     completion_tag_.Clear();
264   }
265   if (rpc_info_) {
266     rpc_info_->Unref();
267     rpc_info_ = nullptr;
268   }
269   if (call_) {
270     auto* call = call_;
271     call_ = nullptr;
272     grpc_call_unref(call);
273   }
274   if (default_reactor_used_.load(std::memory_order_relaxed)) {
275     reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
276     default_reactor_used_.store(false, std::memory_order_relaxed);
277   }
278   test_unary_.reset();
279 }
280
281 void ServerContextBase::BeginCompletionOp(
282     ::grpc::internal::Call* call, std::function<void(bool)> callback,
283     ::grpc_impl::internal::ServerCallbackCall* callback_controller) {
284   GPR_ASSERT(!completion_op_);
285   if (rpc_info_) {
286     rpc_info_->Ref();
287   }
288   grpc_call_ref(call->call());
289   completion_op_ =
290       new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
291           CompletionOp(call, callback_controller);
292   if (callback_controller != nullptr) {
293     completion_tag_.Set(call->call(), std::move(callback), completion_op_,
294                         true);
295     completion_op_->set_core_cq_tag(&completion_tag_);
296     completion_op_->set_tag(completion_op_);
297   } else if (has_notify_when_done_tag_) {
298     completion_op_->set_tag(async_notify_when_done_tag_);
299   }
300   call->PerformOps(completion_op_);
301 }
302
303 ::grpc::internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
304   return static_cast<::grpc::internal::CompletionQueueTag*>(completion_op_);
305 }
306
307 void ServerContextBase::AddInitialMetadata(const grpc::string& key,
308                                            const grpc::string& value) {
309   initial_metadata_.insert(std::make_pair(key, value));
310 }
311
312 void ServerContextBase::AddTrailingMetadata(const grpc::string& key,
313                                             const grpc::string& value) {
314   trailing_metadata_.insert(std::make_pair(key, value));
315 }
316
317 void ServerContextBase::TryCancel() const {
318   ::grpc::internal::CancelInterceptorBatchMethods cancel_methods;
319   if (rpc_info_) {
320     for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
321       rpc_info_->RunInterceptor(&cancel_methods, i);
322     }
323   }
324   grpc_call_error err = grpc_call_cancel_with_status(
325       call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
326   if (err != GRPC_CALL_OK) {
327     gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
328   }
329 }
330
331 bool ServerContextBase::IsCancelled() const {
332   if (completion_tag_) {
333     // When using callback API, this result is always valid.
334     return completion_op_->CheckCancelledAsync();
335   } else if (has_notify_when_done_tag_) {
336     // When using async API, the result is only valid
337     // if the tag has already been delivered at the completion queue
338     return completion_op_ && completion_op_->CheckCancelledAsync();
339   } else {
340     // when using sync API, the result is always valid
341     return completion_op_ && completion_op_->CheckCancelled(cq_);
342   }
343 }
344
345 void ServerContextBase::set_compression_algorithm(
346     grpc_compression_algorithm algorithm) {
347   compression_algorithm_ = algorithm;
348   const char* algorithm_name = nullptr;
349   if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
350     gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
351             algorithm);
352     abort();
353   }
354   GPR_ASSERT(algorithm_name != nullptr);
355   AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
356 }
357
358 grpc::string ServerContextBase::peer() const {
359   grpc::string peer;
360   if (call_) {
361     char* c_peer = grpc_call_get_peer(call_);
362     peer = c_peer;
363     gpr_free(c_peer);
364   }
365   return peer;
366 }
367
368 const struct census_context* ServerContextBase::census_context() const {
369   return grpc_census_call_get_context(call_);
370 }
371
372 void ServerContextBase::SetLoadReportingCosts(
373     const std::vector<grpc::string>& cost_data) {
374   if (call_ == nullptr) return;
375   for (const auto& cost_datum : cost_data) {
376     AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
377   }
378 }
379
380 }  // namespace grpc_impl