73fd6a62c48b2acc21b0cad87213de085bcd00b7
[platform/upstream/grpc.git] / src / cpp / server / server_context.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpcpp/server_context.h>
20 #include <grpcpp/support/server_callback.h>
21
22 #include <algorithm>
23 #include <mutex>
24 #include <utility>
25
26 #include <grpc/compression.h>
27 #include <grpc/grpc.h>
28 #include <grpc/load_reporting.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpcpp/completion_queue.h>
32 #include <grpcpp/impl/call.h>
33 #include <grpcpp/support/time.h>
34
35 #include "src/core/lib/gprpp/ref_counted.h"
36 #include "src/core/lib/surface/call.h"
37
38 namespace grpc {
39
40 // CompletionOp
41
42 class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
43  public:
44   // initial refs: one in the server context, one in the cq
45   // must ref the call before calling constructor and after deleting this
46   CompletionOp(internal::Call* call, internal::ServerReactor* reactor)
47       : call_(*call),
48         reactor_(reactor),
49         has_tag_(false),
50         tag_(nullptr),
51         core_cq_tag_(this),
52         refs_(2),
53         finalized_(false),
54         cancelled_(0),
55         done_intercepting_(false) {}
56
57   // CompletionOp isn't copyable or movable
58   CompletionOp(const CompletionOp&) = delete;
59   CompletionOp& operator=(const CompletionOp&) = delete;
60   CompletionOp(CompletionOp&&) = delete;
61   CompletionOp& operator=(CompletionOp&&) = delete;
62
63   ~CompletionOp() {
64     if (call_.server_rpc_info()) {
65       call_.server_rpc_info()->Unref();
66     }
67   }
68
69   void FillOps(internal::Call* call) override;
70
71   // This should always be arena allocated in the call, so override delete.
72   // But this class is not trivially destructible, so must actually call delete
73   // before allowing the arena to be freed
74   static void operator delete(void* ptr, std::size_t size) {
75     assert(size == sizeof(CompletionOp));
76   }
77
78   // This operator should never be called as the memory should be freed as part
79   // of the arena destruction. It only exists to provide a matching operator
80   // delete to the operator new so that some compilers will not complain (see
81   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
82   // there are no tests catching the compiler warning.
83   static void operator delete(void*, void*) { assert(0); }
84
85   bool FinalizeResult(void** tag, bool* status) override;
86
87   bool CheckCancelled(CompletionQueue* cq) {
88     cq->TryPluck(this);
89     return CheckCancelledNoPluck();
90   }
91   bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
92
93   void set_tag(void* tag) {
94     has_tag_ = true;
95     tag_ = tag;
96   }
97
98   void SetCancelCallback(std::function<void()> callback) {
99     std::lock_guard<std::mutex> lock(mu_);
100
101     if (finalized_ && (cancelled_ != 0)) {
102       callback();
103       return;
104     }
105
106     cancel_callback_ = std::move(callback);
107   }
108
109   void ClearCancelCallback() {
110     std::lock_guard<std::mutex> g(mu_);
111     cancel_callback_ = nullptr;
112   }
113
114   void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
115
116   void* core_cq_tag() override { return core_cq_tag_; }
117
118   void Unref();
119
120   // This will be called while interceptors are run if the RPC is a hijacked
121   // RPC. This should set hijacking state for each of the ops.
122   void SetHijackingState() override {
123     /* Servers don't allow hijacking */
124     GPR_CODEGEN_ASSERT(false);
125   }
126
127   /* Should be called after interceptors are done running */
128   void ContinueFillOpsAfterInterception() override {}
129
130   /* Should be called after interceptors are done running on the finalize result
131    * path */
132   void ContinueFinalizeResultAfterInterception() override {
133     done_intercepting_ = true;
134     if (!has_tag_) {
135       /* We don't have a tag to return. */
136       Unref();
137       return;
138     }
139     /* Start a dummy op so that we can return the tag */
140     GPR_CODEGEN_ASSERT(
141         GRPC_CALL_OK ==
142         grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr));
143   }
144
145  private:
146   bool CheckCancelledNoPluck() {
147     std::lock_guard<std::mutex> g(mu_);
148     return finalized_ ? (cancelled_ != 0) : false;
149   }
150
151   internal::Call call_;
152   internal::ServerReactor* const reactor_;
153   bool has_tag_;
154   void* tag_;
155   void* core_cq_tag_;
156   grpc_core::RefCount refs_;
157   std::mutex mu_;
158   bool finalized_;
159   int cancelled_;  // This is an int (not bool) because it is passed to core
160   std::function<void()> cancel_callback_;
161   bool done_intercepting_;
162   internal::InterceptorBatchMethodsImpl interceptor_methods_;
163 };
164
165 void ServerContext::CompletionOp::Unref() {
166   if (refs_.Unref()) {
167     grpc_call* call = call_.call();
168     delete this;
169     grpc_call_unref(call);
170   }
171 }
172
173 void ServerContext::CompletionOp::FillOps(internal::Call* call) {
174   grpc_op ops;
175   ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
176   ops.data.recv_close_on_server.cancelled = &cancelled_;
177   ops.flags = 0;
178   ops.reserved = nullptr;
179   interceptor_methods_.SetCall(&call_);
180   interceptor_methods_.SetReverse();
181   interceptor_methods_.SetCallOpSetInterface(this);
182   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1,
183                                                    core_cq_tag_, nullptr));
184   /* No interceptors to run here */
185 }
186
187 bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
188   bool ret = false;
189   std::unique_lock<std::mutex> lock(mu_);
190   if (done_intercepting_) {
191     /* We are done intercepting. */
192     if (has_tag_) {
193       *tag = tag_;
194       ret = true;
195     }
196     Unref();
197     return ret;
198   }
199   finalized_ = true;
200
201   // If for some reason the incoming status is false, mark that as a
202   // cancellation.
203   // TODO(vjpai): does this ever happen?
204   if (!*status) {
205     cancelled_ = 1;
206   }
207
208   // Decide whether to call the cancel callback before releasing the lock
209   bool call_cancel = (cancelled_ != 0);
210
211   // If it's a unary cancel callback, call it under the lock so that it doesn't
212   // race with ClearCancelCallback
213   if (cancel_callback_) {
214     cancel_callback_();
215   }
216
217   // Release the lock since we are going to be calling a callback and
218   // interceptors now
219   lock.unlock();
220
221   if (call_cancel && reactor_ != nullptr) {
222     reactor_->OnCancel();
223   }
224
225   /* Add interception point and run through interceptors */
226   interceptor_methods_.AddInterceptionHookPoint(
227       experimental::InterceptionHookPoints::POST_RECV_CLOSE);
228   if (interceptor_methods_.RunInterceptors()) {
229     /* No interceptors were run */
230     if (has_tag_) {
231       *tag = tag_;
232       ret = true;
233     }
234     Unref();
235     return ret;
236   }
237   /* There are interceptors to be run. Return false for now */
238   return false;
239 }
240
241 // ServerContext body
242
243 ServerContext::ServerContext() { Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); }
244
245 ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) {
246   Setup(deadline);
247   std::swap(*client_metadata_.arr(), *arr);
248 }
249
250 void ServerContext::Setup(gpr_timespec deadline) {
251   completion_op_ = nullptr;
252   has_notify_when_done_tag_ = false;
253   async_notify_when_done_tag_ = nullptr;
254   deadline_ = deadline;
255   call_ = nullptr;
256   cq_ = nullptr;
257   sent_initial_metadata_ = false;
258   compression_level_set_ = false;
259   has_pending_ops_ = false;
260   rpc_info_ = nullptr;
261 }
262
263 void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline,
264                                             grpc_metadata_array* arr) {
265   deadline_ = deadline;
266   std::swap(*client_metadata_.arr(), *arr);
267 }
268
269 ServerContext::~ServerContext() { Clear(); }
270
271 void ServerContext::Clear() {
272   auth_context_.reset();
273   initial_metadata_.clear();
274   trailing_metadata_.clear();
275   client_metadata_.Reset();
276   if (completion_op_) {
277     completion_op_->Unref();
278     completion_op_ = nullptr;
279     completion_tag_.Clear();
280   }
281   if (rpc_info_) {
282     rpc_info_->Unref();
283     rpc_info_ = nullptr;
284   }
285   if (call_) {
286     auto* call = call_;
287     call_ = nullptr;
288     grpc_call_unref(call);
289   }
290 }
291
292 void ServerContext::BeginCompletionOp(internal::Call* call,
293                                       std::function<void(bool)> callback,
294                                       internal::ServerReactor* reactor) {
295   GPR_ASSERT(!completion_op_);
296   if (rpc_info_) {
297     rpc_info_->Ref();
298   }
299   grpc_call_ref(call->call());
300   completion_op_ =
301       new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
302           CompletionOp(call, reactor);
303   if (callback != nullptr) {
304     completion_tag_.Set(call->call(), std::move(callback), completion_op_);
305     completion_op_->set_core_cq_tag(&completion_tag_);
306     completion_op_->set_tag(completion_op_);
307   } else if (has_notify_when_done_tag_) {
308     completion_op_->set_tag(async_notify_when_done_tag_);
309   }
310   call->PerformOps(completion_op_);
311 }
312
313 internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
314   return static_cast<internal::CompletionQueueTag*>(completion_op_);
315 }
316
317 void ServerContext::AddInitialMetadata(const grpc::string& key,
318                                        const grpc::string& value) {
319   initial_metadata_.insert(std::make_pair(key, value));
320 }
321
322 void ServerContext::AddTrailingMetadata(const grpc::string& key,
323                                         const grpc::string& value) {
324   trailing_metadata_.insert(std::make_pair(key, value));
325 }
326
327 void ServerContext::TryCancel() const {
328   internal::CancelInterceptorBatchMethods cancel_methods;
329   if (rpc_info_) {
330     for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
331       rpc_info_->RunInterceptor(&cancel_methods, i);
332     }
333   }
334   grpc_call_error err = grpc_call_cancel_with_status(
335       call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
336   if (err != GRPC_CALL_OK) {
337     gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
338   }
339 }
340
341 void ServerContext::SetCancelCallback(std::function<void()> callback) {
342   completion_op_->SetCancelCallback(std::move(callback));
343 }
344
345 void ServerContext::ClearCancelCallback() {
346   completion_op_->ClearCancelCallback();
347 }
348
349 bool ServerContext::IsCancelled() const {
350   if (completion_tag_) {
351     // When using callback API, this result is always valid.
352     return completion_op_->CheckCancelledAsync();
353   } else if (has_notify_when_done_tag_) {
354     // When using async API, the result is only valid
355     // if the tag has already been delivered at the completion queue
356     return completion_op_ && completion_op_->CheckCancelledAsync();
357   } else {
358     // when using sync API, the result is always valid
359     return completion_op_ && completion_op_->CheckCancelled(cq_);
360   }
361 }
362
363 void ServerContext::set_compression_algorithm(
364     grpc_compression_algorithm algorithm) {
365   compression_algorithm_ = algorithm;
366   const char* algorithm_name = nullptr;
367   if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
368     gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
369             algorithm);
370     abort();
371   }
372   GPR_ASSERT(algorithm_name != nullptr);
373   AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
374 }
375
376 grpc::string ServerContext::peer() const {
377   grpc::string peer;
378   if (call_) {
379     char* c_peer = grpc_call_get_peer(call_);
380     peer = c_peer;
381     gpr_free(c_peer);
382   }
383   return peer;
384 }
385
386 const struct census_context* ServerContext::census_context() const {
387   return grpc_census_call_get_context(call_);
388 }
389
390 void ServerContext::SetLoadReportingCosts(
391     const std::vector<grpc::string>& cost_data) {
392   if (call_ == nullptr) return;
393   for (const auto& cost_datum : cost_data) {
394     AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
395   }
396 }
397
398 }  // namespace grpc