9c2b1ee13fc4e3e02edb8f9b6bb9cda50fd5e9e9
[platform/upstream/grpc.git] / src / cpp / server / server_cc.cc
1 /*
2  * Copyright 2015 gRPC authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 #include <grpcpp/server.h>
19
20 #include <cstdlib>
21 #include <sstream>
22 #include <type_traits>
23 #include <utility>
24
25 #include <grpc/grpc.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpcpp/completion_queue.h>
29 #include <grpcpp/generic/async_generic_service.h>
30 #include <grpcpp/impl/codegen/async_unary_call.h>
31 #include <grpcpp/impl/codegen/call.h>
32 #include <grpcpp/impl/codegen/completion_queue_tag.h>
33 #include <grpcpp/impl/codegen/server_interceptor.h>
34 #include <grpcpp/impl/grpc_library.h>
35 #include <grpcpp/impl/method_handler_impl.h>
36 #include <grpcpp/impl/rpc_service_method.h>
37 #include <grpcpp/impl/server_initializer.h>
38 #include <grpcpp/impl/service_type.h>
39 #include <grpcpp/security/server_credentials.h>
40 #include <grpcpp/server_context.h>
41 #include <grpcpp/support/time.h>
42
43 #include "src/core/ext/transport/inproc/inproc_transport.h"
44 #include "src/core/lib/iomgr/exec_ctx.h"
45 #include "src/core/lib/profiling/timers.h"
46 #include "src/core/lib/surface/call.h"
47 #include "src/core/lib/surface/completion_queue.h"
48 #include "src/cpp/client/create_channel_internal.h"
49 #include "src/cpp/server/health/default_health_check_service.h"
50 #include "src/cpp/thread_manager/thread_manager.h"
51
52 namespace grpc {
53 namespace {
54
55 // The default value for maximum number of threads that can be created in the
56 // sync server. This value of INT_MAX is chosen to match the default behavior if
57 // no ResourceQuota is set. To modify the max number of threads in a sync
58 // server, pass a custom ResourceQuota object  (with the desired number of
59 // max-threads set) to the server builder.
60 #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
61
62 // How many callback requests of each method should we pre-register at start
63 #define DEFAULT_CALLBACK_REQS_PER_METHOD 512
64
65 // What is the (soft) limit for outstanding requests in the server
66 #define SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING 30000
67
68 // If the number of unmatched requests for a method drops below this amount, try
69 // to allocate extra unless it pushes the total number of callbacks above the
70 // soft maximum
71 #define SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD 128
72
73 class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
74  public:
75   ~DefaultGlobalCallbacks() override {}
76   void PreSynchronousRequest(ServerContext* context) override {}
77   void PostSynchronousRequest(ServerContext* context) override {}
78 };
79
80 std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
81 gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
82
83 void InitGlobalCallbacks() {
84   if (!g_callbacks) {
85     g_callbacks.reset(new DefaultGlobalCallbacks());
86   }
87 }
88
89 class ShutdownTag : public internal::CompletionQueueTag {
90  public:
91   bool FinalizeResult(void** tag, bool* status) { return false; }
92 };
93
94 class DummyTag : public internal::CompletionQueueTag {
95  public:
96   bool FinalizeResult(void** tag, bool* status) { return true; }
97 };
98
99 class UnimplementedAsyncRequestContext {
100  protected:
101   UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
102
103   GenericServerContext server_context_;
104   GenericServerAsyncReaderWriter generic_stream_;
105 };
106
107 }  // namespace
108
109 ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
110     ServerInterface* server, ServerContext* context,
111     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
112     ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
113     : server_(server),
114       context_(context),
115       stream_(stream),
116       call_cq_(call_cq),
117       notification_cq_(notification_cq),
118       tag_(tag),
119       delete_on_finalize_(delete_on_finalize),
120       call_(nullptr),
121       done_intercepting_(false) {
122   /* Set up interception state partially for the receive ops. call_wrapper_ is
123    * not filled at this point, but it will be filled before the interceptors are
124    * run. */
125   interceptor_methods_.SetCall(&call_wrapper_);
126   interceptor_methods_.SetReverse();
127   call_cq_->RegisterAvalanching();  // This op will trigger more ops
128 }
129
130 ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
131   call_cq_->CompleteAvalanching();
132 }
133
134 bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
135                                                        bool* status) {
136   if (done_intercepting_) {
137     *tag = tag_;
138     if (delete_on_finalize_) {
139       delete this;
140     }
141     return true;
142   }
143   context_->set_call(call_);
144   context_->cq_ = call_cq_;
145   if (call_wrapper_.call() == nullptr) {
146     // Fill it since it is empty.
147     call_wrapper_ = internal::Call(
148         call_, server_, call_cq_, server_->max_receive_message_size(), nullptr);
149   }
150
151   // just the pointers inside call are copied here
152   stream_->BindCall(&call_wrapper_);
153
154   if (*status && call_ && call_wrapper_.server_rpc_info()) {
155     done_intercepting_ = true;
156     // Set interception point for RECV INITIAL METADATA
157     interceptor_methods_.AddInterceptionHookPoint(
158         experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
159     interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_);
160     if (interceptor_methods_.RunInterceptors(
161             [this]() { ContinueFinalizeResultAfterInterception(); })) {
162       // There are no interceptors to run. Continue
163     } else {
164       // There were interceptors to be run, so
165       // ContinueFinalizeResultAfterInterception will be run when interceptors
166       // are done.
167       return false;
168     }
169   }
170   if (*status && call_) {
171     context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
172   }
173   *tag = tag_;
174   if (delete_on_finalize_) {
175     delete this;
176   }
177   return true;
178 }
179
180 void ServerInterface::BaseAsyncRequest::
181     ContinueFinalizeResultAfterInterception() {
182   context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
183   // Queue a tag which will be returned immediately
184   grpc_core::ExecCtx exec_ctx;
185   grpc_cq_begin_op(notification_cq_->cq(), this);
186   grpc_cq_end_op(
187       notification_cq_->cq(), this, GRPC_ERROR_NONE,
188       [](void* arg, grpc_cq_completion* completion) { delete completion; },
189       nullptr, new grpc_cq_completion());
190 }
191
192 ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
193     ServerInterface* server, ServerContext* context,
194     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
195     ServerCompletionQueue* notification_cq, void* tag, const char* name,
196     internal::RpcMethod::RpcType type)
197     : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
198                        true),
199       name_(name),
200       type_(type) {}
201
202 void ServerInterface::RegisteredAsyncRequest::IssueRequest(
203     void* registered_method, grpc_byte_buffer** payload,
204     ServerCompletionQueue* notification_cq) {
205   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call(
206                                  server_->server(), registered_method, &call_,
207                                  &context_->deadline_,
208                                  context_->client_metadata_.arr(), payload,
209                                  call_cq_->cq(), notification_cq->cq(), this));
210 }
211
212 ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
213     ServerInterface* server, GenericServerContext* context,
214     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
215     ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
216     : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
217                        delete_on_finalize) {
218   grpc_call_details_init(&call_details_);
219   GPR_ASSERT(notification_cq);
220   GPR_ASSERT(call_cq);
221   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
222                                  server->server(), &call_, &call_details_,
223                                  context->client_metadata_.arr(), call_cq->cq(),
224                                  notification_cq->cq(), this));
225 }
226
227 bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
228                                                           bool* status) {
229   // If we are done intercepting, there is nothing more for us to do
230   if (done_intercepting_) {
231     return BaseAsyncRequest::FinalizeResult(tag, status);
232   }
233   // TODO(yangg) remove the copy here.
234   if (*status) {
235     static_cast<GenericServerContext*>(context_)->method_ =
236         StringFromCopiedSlice(call_details_.method);
237     static_cast<GenericServerContext*>(context_)->host_ =
238         StringFromCopiedSlice(call_details_.host);
239     context_->deadline_ = call_details_.deadline;
240   }
241   grpc_slice_unref(call_details_.method);
242   grpc_slice_unref(call_details_.host);
243   call_wrapper_ = internal::Call(
244       call_, server_, call_cq_, server_->max_receive_message_size(),
245       context_->set_server_rpc_info(
246           static_cast<GenericServerContext*>(context_)->method_.c_str(),
247           internal::RpcMethod::BIDI_STREAMING,
248           *server_->interceptor_creators()));
249   return BaseAsyncRequest::FinalizeResult(tag, status);
250 }
251
252 namespace {
253 class ShutdownCallback : public grpc_experimental_completion_queue_functor {
254  public:
255   ShutdownCallback() { functor_run = &ShutdownCallback::Run; }
256   // TakeCQ takes ownership of the cq into the shutdown callback
257   // so that the shutdown callback will be responsible for destroying it
258   void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
259
260   // The Run function will get invoked by the completion queue library
261   // when the shutdown is actually complete
262   static void Run(grpc_experimental_completion_queue_functor* cb, int) {
263     auto* callback = static_cast<ShutdownCallback*>(cb);
264     delete callback->cq_;
265     delete callback;
266   }
267
268  private:
269   CompletionQueue* cq_ = nullptr;
270 };
271 }  // namespace
272
273 }  // namespace grpc
274
275 namespace grpc_impl {
276
277 /// Use private inheritance rather than composition only to establish order
278 /// of construction, since the public base class should be constructed after the
279 /// elements belonging to the private base class are constructed. This is not
280 /// possible using true composition.
281 class Server::UnimplementedAsyncRequest final
282     : private grpc::UnimplementedAsyncRequestContext,
283       public GenericAsyncRequest {
284  public:
285   UnimplementedAsyncRequest(Server* server, grpc::ServerCompletionQueue* cq)
286       : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
287                             nullptr, false),
288         server_(server),
289         cq_(cq) {}
290
291   bool FinalizeResult(void** tag, bool* status) override;
292
293   grpc::ServerContext* context() { return &server_context_; }
294   grpc::GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
295
296  private:
297   Server* const server_;
298   grpc::ServerCompletionQueue* const cq_;
299 };
300
301 /// UnimplementedAsyncResponse should not post user-visible completions to the
302 /// C++ completion queue, but is generated as a CQ event by the core
303 class Server::UnimplementedAsyncResponse final
304     : public grpc::internal::CallOpSet<
305           grpc::internal::CallOpSendInitialMetadata,
306           grpc::internal::CallOpServerSendStatus> {
307  public:
308   UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
309   ~UnimplementedAsyncResponse() { delete request_; }
310
311   bool FinalizeResult(void** tag, bool* status) override {
312     if (grpc::internal::CallOpSet<
313             grpc::internal::CallOpSendInitialMetadata,
314             grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag,
315                                                                     status)) {
316       delete this;
317     } else {
318       // The tag was swallowed due to interception. We will see it again.
319     }
320     return false;
321   }
322
323  private:
324   UnimplementedAsyncRequest* const request_;
325 };
326
327 class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
328  public:
329   SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag)
330       : method_(method),
331         method_tag_(method_tag),
332         in_flight_(false),
333         has_request_payload_(method->method_type() ==
334                                  grpc::internal::RpcMethod::NORMAL_RPC ||
335                              method->method_type() ==
336                                  grpc::internal::RpcMethod::SERVER_STREAMING),
337         call_details_(nullptr),
338         cq_(nullptr) {
339     grpc_metadata_array_init(&request_metadata_);
340   }
341
342   ~SyncRequest() {
343     if (call_details_) {
344       delete call_details_;
345     }
346     grpc_metadata_array_destroy(&request_metadata_);
347   }
348
349   void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); }
350
351   void TeardownRequest() {
352     grpc_completion_queue_destroy(cq_);
353     cq_ = nullptr;
354   }
355
356   void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
357     GPR_ASSERT(cq_ && !in_flight_);
358     in_flight_ = true;
359     if (method_tag_) {
360       if (grpc_server_request_registered_call(
361               server, method_tag_, &call_, &deadline_, &request_metadata_,
362               has_request_payload_ ? &request_payload_ : nullptr, cq_,
363               notify_cq, this) != GRPC_CALL_OK) {
364         TeardownRequest();
365         return;
366       }
367     } else {
368       if (!call_details_) {
369         call_details_ = new grpc_call_details;
370         grpc_call_details_init(call_details_);
371       }
372       if (grpc_server_request_call(server, &call_, call_details_,
373                                    &request_metadata_, cq_, notify_cq,
374                                    this) != GRPC_CALL_OK) {
375         TeardownRequest();
376         return;
377       }
378     }
379   }
380
381   void PostShutdownCleanup() {
382     if (call_) {
383       grpc_call_unref(call_);
384       call_ = nullptr;
385     }
386     if (cq_) {
387       grpc_completion_queue_destroy(cq_);
388       cq_ = nullptr;
389     }
390   }
391
392   bool FinalizeResult(void** tag, bool* status) override {
393     if (!*status) {
394       grpc_completion_queue_destroy(cq_);
395       cq_ = nullptr;
396     }
397     if (call_details_) {
398       deadline_ = call_details_->deadline;
399       grpc_call_details_destroy(call_details_);
400       grpc_call_details_init(call_details_);
401     }
402     return true;
403   }
404
405   // The CallData class represents a call that is "active" as opposed
406   // to just being requested. It wraps and takes ownership of the cq from
407   // the call request
408   class CallData final {
409    public:
410     explicit CallData(Server* server, SyncRequest* mrd)
411         : cq_(mrd->cq_),
412           ctx_(mrd->deadline_, &mrd->request_metadata_),
413           has_request_payload_(mrd->has_request_payload_),
414           request_payload_(has_request_payload_ ? mrd->request_payload_
415                                                 : nullptr),
416           request_(nullptr),
417           method_(mrd->method_),
418           call_(
419               mrd->call_, server, &cq_, server->max_receive_message_size(),
420               ctx_.set_server_rpc_info(method_->name(), method_->method_type(),
421                                        server->interceptor_creators_)),
422           server_(server),
423           global_callbacks_(nullptr),
424           resources_(false) {
425       ctx_.set_call(mrd->call_);
426       ctx_.cq_ = &cq_;
427       GPR_ASSERT(mrd->in_flight_);
428       mrd->in_flight_ = false;
429       mrd->request_metadata_.count = 0;
430     }
431
432     ~CallData() {
433       if (has_request_payload_ && request_payload_) {
434         grpc_byte_buffer_destroy(request_payload_);
435       }
436     }
437
438     void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
439              bool resources) {
440       global_callbacks_ = global_callbacks;
441       resources_ = resources;
442
443       interceptor_methods_.SetCall(&call_);
444       interceptor_methods_.SetReverse();
445       // Set interception point for RECV INITIAL METADATA
446       interceptor_methods_.AddInterceptionHookPoint(
447           grpc::experimental::InterceptionHookPoints::
448               POST_RECV_INITIAL_METADATA);
449       interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
450
451       if (has_request_payload_) {
452         // Set interception point for RECV MESSAGE
453         auto* handler = resources_ ? method_->handler()
454                                    : server_->resource_exhausted_handler_.get();
455         request_ = handler->Deserialize(call_.call(), request_payload_,
456                                         &request_status_, nullptr);
457
458         request_payload_ = nullptr;
459         interceptor_methods_.AddInterceptionHookPoint(
460             grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
461         interceptor_methods_.SetRecvMessage(request_, nullptr);
462       }
463
464       if (interceptor_methods_.RunInterceptors(
465               [this]() { ContinueRunAfterInterception(); })) {
466         ContinueRunAfterInterception();
467       } else {
468         // There were interceptors to be run, so ContinueRunAfterInterception
469         // will be run when interceptors are done.
470       }
471     }
472
473     void ContinueRunAfterInterception() {
474       {
475         ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
476         global_callbacks_->PreSynchronousRequest(&ctx_);
477         auto* handler = resources_ ? method_->handler()
478                                    : server_->resource_exhausted_handler_.get();
479         handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
480             &call_, &ctx_, request_, request_status_, nullptr, nullptr));
481         request_ = nullptr;
482         global_callbacks_->PostSynchronousRequest(&ctx_);
483
484         cq_.Shutdown();
485
486         grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
487         cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
488
489         /* Ensure the cq_ is shutdown */
490         grpc::DummyTag ignored_tag;
491         GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
492       }
493       delete this;
494     }
495
496    private:
497     grpc::CompletionQueue cq_;
498     grpc::ServerContext ctx_;
499     const bool has_request_payload_;
500     grpc_byte_buffer* request_payload_;
501     void* request_;
502     grpc::Status request_status_;
503     grpc::internal::RpcServiceMethod* const method_;
504     grpc::internal::Call call_;
505     Server* server_;
506     std::shared_ptr<GlobalCallbacks> global_callbacks_;
507     bool resources_;
508     grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
509   };
510
511  private:
512   grpc::internal::RpcServiceMethod* const method_;
513   void* const method_tag_;
514   bool in_flight_;
515   const bool has_request_payload_;
516   grpc_call* call_;
517   grpc_call_details* call_details_;
518   gpr_timespec deadline_;
519   grpc_metadata_array request_metadata_;
520   grpc_byte_buffer* request_payload_;
521   grpc_completion_queue* cq_;
522 };
523
524 class Server::CallbackRequestBase : public grpc::internal::CompletionQueueTag {
525  public:
526   virtual ~CallbackRequestBase() {}
527   virtual bool Request() = 0;
528 };
529
530 template <class ServerContextType>
531 class Server::CallbackRequest final : public Server::CallbackRequestBase {
532  public:
533   static_assert(std::is_base_of<grpc::ServerContext, ServerContextType>::value,
534                 "ServerContextType must be derived from ServerContext");
535
536   // The constructor needs to know the server for this callback request and its
537   // index in the server's request count array to allow for proper dynamic
538   // requesting of incoming RPCs. For codegen services, the values of method and
539   // method_tag represent the defined characteristics of the method being
540   // requested. For generic services, method and method_tag are nullptr since
541   // these services don't have pre-defined methods or method registration tags.
542   CallbackRequest(Server* server, size_t method_idx,
543                   grpc::internal::RpcServiceMethod* method, void* method_tag)
544       : server_(server),
545         method_index_(method_idx),
546         method_(method),
547         method_tag_(method_tag),
548         has_request_payload_(
549             method_ != nullptr &&
550             (method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC ||
551              method->method_type() ==
552                  grpc::internal::RpcMethod::SERVER_STREAMING)),
553         cq_(server->CallbackCQ()),
554         tag_(this) {
555     server_->callback_reqs_outstanding_++;
556     Setup();
557   }
558
559   ~CallbackRequest() {
560     Clear();
561
562     // The counter of outstanding requests must be decremented
563     // under a lock in case it causes the server shutdown.
564     grpc::internal::MutexLock l(&server_->callback_reqs_mu_);
565     if (--server_->callback_reqs_outstanding_ == 0) {
566       server_->callback_reqs_done_cv_.Signal();
567     }
568   }
569
570   bool Request() override {
571     if (method_tag_) {
572       if (GRPC_CALL_OK !=
573           grpc_server_request_registered_call(
574               server_->c_server(), method_tag_, &call_, &deadline_,
575               &request_metadata_,
576               has_request_payload_ ? &request_payload_ : nullptr, cq_->cq(),
577               cq_->cq(), static_cast<void*>(&tag_))) {
578         return false;
579       }
580     } else {
581       if (!call_details_) {
582         call_details_ = new grpc_call_details;
583         grpc_call_details_init(call_details_);
584       }
585       if (grpc_server_request_call(server_->c_server(), &call_, call_details_,
586                                    &request_metadata_, cq_->cq(), cq_->cq(),
587                                    static_cast<void*>(&tag_)) != GRPC_CALL_OK) {
588         return false;
589       }
590     }
591     return true;
592   }
593
594   // Needs specialization to account for different processing of metadata
595   // in generic API
596   bool FinalizeResult(void** tag, bool* status) override;
597
598  private:
599   // method_name needs to be specialized between named method and generic
600   const char* method_name() const;
601
602   class CallbackCallTag : public grpc_experimental_completion_queue_functor {
603    public:
604     CallbackCallTag(Server::CallbackRequest<ServerContextType>* req)
605         : req_(req) {
606       functor_run = &CallbackCallTag::StaticRun;
607     }
608
609     // force_run can not be performed on a tag if operations using this tag
610     // have been sent to PerformOpsOnCall. It is intended for error conditions
611     // that are detected before the operations are internally processed.
612     void force_run(bool ok) { Run(ok); }
613
614    private:
615     Server::CallbackRequest<ServerContextType>* req_;
616     grpc::internal::Call* call_;
617
618     static void StaticRun(grpc_experimental_completion_queue_functor* cb,
619                           int ok) {
620       static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
621     }
622     void Run(bool ok) {
623       void* ignored = req_;
624       bool new_ok = ok;
625       GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
626       GPR_ASSERT(ignored == req_);
627
628       int count =
629           static_cast<int>(gpr_atm_no_barrier_fetch_add(
630               &req_->server_
631                    ->callback_unmatched_reqs_count_[req_->method_index_],
632               -1)) -
633           1;
634       if (!ok) {
635         // The call has been shutdown.
636         // Delete its contents to free up the request.
637         delete req_;
638         return;
639       }
640
641       // If this was the last request in the list or it is below the soft
642       // minimum and there are spare requests available, set up a new one.
643       if (count == 0 || (count < SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD &&
644                          req_->server_->callback_reqs_outstanding_ <
645                              SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING)) {
646         auto* new_req = new CallbackRequest<ServerContextType>(
647             req_->server_, req_->method_index_, req_->method_,
648             req_->method_tag_);
649         if (!new_req->Request()) {
650           // The server must have just decided to shutdown.
651           gpr_atm_no_barrier_fetch_add(
652               &new_req->server_
653                    ->callback_unmatched_reqs_count_[new_req->method_index_],
654               -1);
655           delete new_req;
656         }
657       }
658
659       // Bind the call, deadline, and metadata from what we got
660       req_->ctx_.set_call(req_->call_);
661       req_->ctx_.cq_ = req_->cq_;
662       req_->ctx_.BindDeadlineAndMetadata(req_->deadline_,
663                                          &req_->request_metadata_);
664       req_->request_metadata_.count = 0;
665
666       // Create a C++ Call to control the underlying core call
667       call_ =
668           new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call)))
669               grpc::internal::Call(
670                   req_->call_, req_->server_, req_->cq_,
671                   req_->server_->max_receive_message_size(),
672                   req_->ctx_.set_server_rpc_info(
673                       req_->method_name(),
674                       (req_->method_ != nullptr)
675                           ? req_->method_->method_type()
676                           : grpc::internal::RpcMethod::BIDI_STREAMING,
677                       req_->server_->interceptor_creators_));
678
679       req_->interceptor_methods_.SetCall(call_);
680       req_->interceptor_methods_.SetReverse();
681       // Set interception point for RECV INITIAL METADATA
682       req_->interceptor_methods_.AddInterceptionHookPoint(
683           grpc::experimental::InterceptionHookPoints::
684               POST_RECV_INITIAL_METADATA);
685       req_->interceptor_methods_.SetRecvInitialMetadata(
686           &req_->ctx_.client_metadata_);
687
688       if (req_->has_request_payload_) {
689         // Set interception point for RECV MESSAGE
690         req_->request_ = req_->method_->handler()->Deserialize(
691             req_->call_, req_->request_payload_, &req_->request_status_,
692             &req_->handler_data_);
693         req_->request_payload_ = nullptr;
694         req_->interceptor_methods_.AddInterceptionHookPoint(
695             grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
696         req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr);
697       }
698
699       if (req_->interceptor_methods_.RunInterceptors(
700               [this] { ContinueRunAfterInterception(); })) {
701         ContinueRunAfterInterception();
702       } else {
703         // There were interceptors to be run, so ContinueRunAfterInterception
704         // will be run when interceptors are done.
705       }
706     }
707     void ContinueRunAfterInterception() {
708       auto* handler = (req_->method_ != nullptr)
709                           ? req_->method_->handler()
710                           : req_->server_->generic_handler_.get();
711       handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
712           call_, &req_->ctx_, req_->request_, req_->request_status_,
713           req_->handler_data_, [this] {
714             // Recycle this request if there aren't too many outstanding.
715             // Note that we don't have to worry about a case where there
716             // are no requests waiting to match for this method since that
717             // is already taken care of when binding a request to a call.
718             // TODO(vjpai): Also don't recycle this request if the dynamic
719             //              load no longer justifies it. Consider measuring
720             //              dynamic load and setting a target accordingly.
721             if (req_->server_->callback_reqs_outstanding_ <
722                 SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING) {
723               req_->Clear();
724               req_->Setup();
725             } else {
726               // We can free up this request because there are too many
727               delete req_;
728               return;
729             }
730             if (!req_->Request()) {
731               // The server must have just decided to shutdown.
732               delete req_;
733             }
734           }));
735     }
736   };
737
738   void Clear() {
739     if (call_details_) {
740       delete call_details_;
741       call_details_ = nullptr;
742     }
743     grpc_metadata_array_destroy(&request_metadata_);
744     if (has_request_payload_ && request_payload_) {
745       grpc_byte_buffer_destroy(request_payload_);
746     }
747     ctx_.Clear();
748     interceptor_methods_.ClearState();
749   }
750
751   void Setup() {
752     gpr_atm_no_barrier_fetch_add(
753         &server_->callback_unmatched_reqs_count_[method_index_], 1);
754     grpc_metadata_array_init(&request_metadata_);
755     ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME));
756     request_payload_ = nullptr;
757     request_ = nullptr;
758     handler_data_ = nullptr;
759     request_status_ = grpc::Status();
760   }
761
762   Server* const server_;
763   const size_t method_index_;
764   grpc::internal::RpcServiceMethod* const method_;
765   void* const method_tag_;
766   const bool has_request_payload_;
767   grpc_byte_buffer* request_payload_;
768   void* request_;
769   void* handler_data_;
770   grpc::Status request_status_;
771   grpc_call_details* call_details_ = nullptr;
772   grpc_call* call_;
773   gpr_timespec deadline_;
774   grpc_metadata_array request_metadata_;
775   grpc::CompletionQueue* cq_;
776   CallbackCallTag tag_;
777   ServerContextType ctx_;
778   grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
779 };
780
781 template <>
782 bool Server::CallbackRequest<grpc::ServerContext>::FinalizeResult(
783     void** tag, bool* status) {
784   return false;
785 }
786
787 template <>
788 bool Server::CallbackRequest<grpc::GenericServerContext>::FinalizeResult(
789     void** tag, bool* status) {
790   if (*status) {
791     // TODO(yangg) remove the copy here
792     ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method);
793     ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host);
794   }
795   grpc_slice_unref(call_details_->method);
796   grpc_slice_unref(call_details_->host);
797   return false;
798 }
799
800 template <>
801 const char* Server::CallbackRequest<grpc::ServerContext>::method_name() const {
802   return method_->name();
803 }
804
805 template <>
806 const char* Server::CallbackRequest<grpc::GenericServerContext>::method_name()
807     const {
808   return ctx_.method().c_str();
809 }
810
811 // Implementation of ThreadManager. Each instance of SyncRequestThreadManager
812 // manages a pool of threads that poll for incoming Sync RPCs and call the
813 // appropriate RPC handlers
814 class Server::SyncRequestThreadManager : public grpc::ThreadManager {
815  public:
816   SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq,
817                            std::shared_ptr<GlobalCallbacks> global_callbacks,
818                            grpc_resource_quota* rq, int min_pollers,
819                            int max_pollers, int cq_timeout_msec)
820       : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
821         server_(server),
822         server_cq_(server_cq),
823         cq_timeout_msec_(cq_timeout_msec),
824         global_callbacks_(std::move(global_callbacks)) {}
825
826   WorkStatus PollForWork(void** tag, bool* ok) override {
827     *tag = nullptr;
828     // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working
829     // right now
830     gpr_timespec deadline =
831         gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
832                      gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN));
833
834     switch (server_cq_->AsyncNext(tag, ok, deadline)) {
835       case grpc::CompletionQueue::TIMEOUT:
836         return TIMEOUT;
837       case grpc::CompletionQueue::SHUTDOWN:
838         return SHUTDOWN;
839       case grpc::CompletionQueue::GOT_EVENT:
840         return WORK_FOUND;
841     }
842
843     GPR_UNREACHABLE_CODE(return TIMEOUT);
844   }
845
846   void DoWork(void* tag, bool ok, bool resources) override {
847     SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
848
849     if (!sync_req) {
850       // No tag. Nothing to work on. This is an unlikley scenario and possibly a
851       // bug in RPC Manager implementation.
852       gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag");
853       return;
854     }
855
856     if (ok) {
857       // Calldata takes ownership of the completion queue and interceptors
858       // inside sync_req
859       auto* cd = new SyncRequest::CallData(server_, sync_req);
860       // Prepare for the next request
861       if (!IsShutdown()) {
862         sync_req->SetupRequest();  // Create new completion queue for sync_req
863         sync_req->Request(server_->c_server(), server_cq_->cq());
864       }
865
866       GPR_TIMER_SCOPE("cd.Run()", 0);
867       cd->Run(global_callbacks_, resources);
868     }
869     // TODO (sreek) If ok is false here (which it isn't in case of
870     // grpc_request_registered_call), we should still re-queue the request
871     // object
872   }
873
874   void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) {
875     sync_requests_.emplace_back(new SyncRequest(method, tag));
876   }
877
878   void AddUnknownSyncMethod() {
879     if (!sync_requests_.empty()) {
880       unknown_method_.reset(new grpc::internal::RpcServiceMethod(
881           "unknown", grpc::internal::RpcMethod::BIDI_STREAMING,
882           new grpc::internal::UnknownMethodHandler));
883       sync_requests_.emplace_back(
884           new SyncRequest(unknown_method_.get(), nullptr));
885     }
886   }
887
888   void Shutdown() override {
889     ThreadManager::Shutdown();
890     server_cq_->Shutdown();
891   }
892
893   void Wait() override {
894     ThreadManager::Wait();
895     // Drain any pending items from the queue
896     void* tag;
897     bool ok;
898     while (server_cq_->Next(&tag, &ok)) {
899       if (ok) {
900         // If a request was pulled off the queue, it means that the thread
901         // handling the request added it to the completion queue after shutdown
902         // was called - because the thread had already started and checked the
903         // shutdown flag before shutdown was called. In this case, we simply
904         // clean it up here, *after* calling wait on all the worker threads, at
905         // which point we are certain no in-flight requests will add more to the
906         // queue. This fixes an intermittent memory leak on shutdown.
907         SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
908         sync_req->PostShutdownCleanup();
909       }
910     }
911   }
912
913   void Start() {
914     if (!sync_requests_.empty()) {
915       for (auto m = sync_requests_.begin(); m != sync_requests_.end(); m++) {
916         (*m)->SetupRequest();
917         (*m)->Request(server_->c_server(), server_cq_->cq());
918       }
919
920       Initialize();  // ThreadManager's Initialize()
921     }
922   }
923
924  private:
925   Server* server_;
926   grpc::CompletionQueue* server_cq_;
927   int cq_timeout_msec_;
928   std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
929   std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_;
930   std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
931 };
932
933 static grpc::internal::GrpcLibraryInitializer g_gli_initializer;
934 Server::Server(
935     int max_receive_message_size, grpc::ChannelArguments* args,
936     std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
937         sync_server_cqs,
938     int min_pollers, int max_pollers, int sync_cq_timeout_msec,
939     grpc_resource_quota* server_rq,
940     std::vector<
941         std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
942         interceptor_creators)
943     : interceptor_creators_(std::move(interceptor_creators)),
944       max_receive_message_size_(max_receive_message_size),
945       sync_server_cqs_(std::move(sync_server_cqs)),
946       started_(false),
947       shutdown_(false),
948       shutdown_notified_(false),
949       server_(nullptr),
950       server_initializer_(new grpc_impl::ServerInitializer(this)),
951       health_check_service_disabled_(false) {
952   g_gli_initializer.summon();
953   gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks);
954   global_callbacks_ = grpc::g_callbacks;
955   global_callbacks_->UpdateArguments(args);
956
957   if (sync_server_cqs_ != nullptr) {
958     bool default_rq_created = false;
959     if (server_rq == nullptr) {
960       server_rq = grpc_resource_quota_create("SyncServer-default-rq");
961       grpc_resource_quota_set_max_threads(server_rq,
962                                           DEFAULT_MAX_SYNC_SERVER_THREADS);
963       default_rq_created = true;
964     }
965
966     for (const auto& it : *sync_server_cqs_) {
967       sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
968           this, it.get(), global_callbacks_, server_rq, min_pollers,
969           max_pollers, sync_cq_timeout_msec));
970     }
971
972     if (default_rq_created) {
973       grpc_resource_quota_unref(server_rq);
974     }
975   }
976
977   grpc_channel_args channel_args;
978   args->SetChannelArgs(&channel_args);
979
980   for (size_t i = 0; i < channel_args.num_args; i++) {
981     if (0 == strcmp(channel_args.args[i].key,
982                     grpc::kHealthCheckServiceInterfaceArg)) {
983       if (channel_args.args[i].value.pointer.p == nullptr) {
984         health_check_service_disabled_ = true;
985       } else {
986         health_check_service_.reset(
987             static_cast<grpc::HealthCheckServiceInterface*>(
988                 channel_args.args[i].value.pointer.p));
989       }
990       break;
991     }
992   }
993
994   server_ = grpc_server_create(&channel_args, nullptr);
995 }
996
997 Server::~Server() {
998   {
999     grpc::internal::ReleasableMutexLock lock(&mu_);
1000     if (callback_cq_ != nullptr) {
1001       callback_cq_->Shutdown();
1002     }
1003     if (started_ && !shutdown_) {
1004       lock.Unlock();
1005       Shutdown();
1006     } else if (!started_) {
1007       // Shutdown the completion queues
1008       for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
1009         (*it)->Shutdown();
1010       }
1011     }
1012   }
1013
1014   grpc_server_destroy(server_);
1015   for (auto& per_method_count : callback_unmatched_reqs_count_) {
1016     // There should be no more unmatched callbacks for any method
1017     // as each request is failed by Shutdown. Check that this actually
1018     // happened
1019     GPR_ASSERT(static_cast<int>(gpr_atm_no_barrier_load(&per_method_count)) ==
1020                0);
1021   }
1022 }
1023
1024 void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
1025   GPR_ASSERT(!grpc::g_callbacks);
1026   GPR_ASSERT(callbacks);
1027   grpc::g_callbacks.reset(callbacks);
1028 }
1029
1030 grpc_server* Server::c_server() { return server_; }
1031
1032 std::shared_ptr<grpc::Channel> Server::InProcessChannel(
1033     const grpc::ChannelArguments& args) {
1034   grpc_channel_args channel_args = args.c_channel_args();
1035   return grpc::CreateChannelInternal(
1036       "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr),
1037       std::vector<std::unique_ptr<
1038           grpc::experimental::ClientInterceptorFactoryInterface>>());
1039 }
1040
1041 std::shared_ptr<grpc::Channel>
1042 Server::experimental_type::InProcessChannelWithInterceptors(
1043     const grpc::ChannelArguments& args,
1044     std::vector<
1045         std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
1046         interceptor_creators) {
1047   grpc_channel_args channel_args = args.c_channel_args();
1048   return grpc::CreateChannelInternal(
1049       "inproc",
1050       grpc_inproc_channel_create(server_->server_, &channel_args, nullptr),
1051       std::move(interceptor_creators));
1052 }
1053
1054 static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
1055     grpc::internal::RpcServiceMethod* method) {
1056   switch (method->method_type()) {
1057     case grpc::internal::RpcMethod::NORMAL_RPC:
1058     case grpc::internal::RpcMethod::SERVER_STREAMING:
1059       return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
1060     case grpc::internal::RpcMethod::CLIENT_STREAMING:
1061     case grpc::internal::RpcMethod::BIDI_STREAMING:
1062       return GRPC_SRM_PAYLOAD_NONE;
1063   }
1064   GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
1065 }
1066
1067 bool Server::RegisterService(const grpc::string* host, grpc::Service* service) {
1068   bool has_async_methods = service->has_async_methods();
1069   if (has_async_methods) {
1070     GPR_ASSERT(service->server_ == nullptr &&
1071                "Can only register an asynchronous service against one server.");
1072     service->server_ = this;
1073   }
1074
1075   const char* method_name = nullptr;
1076
1077   for (auto it = service->methods_.begin(); it != service->methods_.end();
1078        ++it) {
1079     if (it->get() == nullptr) {  // Handled by generic service if any.
1080       continue;
1081     }
1082
1083     grpc::internal::RpcServiceMethod* method = it->get();
1084     void* method_registration_tag = grpc_server_register_method(
1085         server_, method->name(), host ? host->c_str() : nullptr,
1086         PayloadHandlingForMethod(method), 0);
1087     if (method_registration_tag == nullptr) {
1088       gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
1089               method->name());
1090       return false;
1091     }
1092
1093     if (method->handler() == nullptr) {  // Async method without handler
1094       method->set_server_tag(method_registration_tag);
1095     } else if (method->api_type() ==
1096                grpc::internal::RpcServiceMethod::ApiType::SYNC) {
1097       for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
1098         (*it)->AddSyncMethod(method, method_registration_tag);
1099       }
1100     } else {
1101       // a callback method. Register at least some callback requests
1102       callback_unmatched_reqs_count_.push_back(0);
1103       auto method_index = callback_unmatched_reqs_count_.size() - 1;
1104       // TODO(vjpai): Register these dynamically based on need
1105       for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) {
1106         callback_reqs_to_start_.push_back(
1107             new CallbackRequest<grpc::ServerContext>(this, method_index, method,
1108                                                      method_registration_tag));
1109       }
1110       // Enqueue it so that it will be Request'ed later after all request
1111       // matchers are created at core server startup
1112     }
1113
1114     method_name = method->name();
1115   }
1116
1117   // Parse service name.
1118   if (method_name != nullptr) {
1119     std::stringstream ss(method_name);
1120     grpc::string service_name;
1121     if (std::getline(ss, service_name, '/') &&
1122         std::getline(ss, service_name, '/')) {
1123       services_.push_back(service_name);
1124     }
1125   }
1126   return true;
1127 }
1128
1129 void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) {
1130   GPR_ASSERT(service->server_ == nullptr &&
1131              "Can only register an async generic service against one server.");
1132   service->server_ = this;
1133   has_async_generic_service_ = true;
1134 }
1135
1136 void Server::RegisterCallbackGenericService(
1137     grpc::experimental::CallbackGenericService* service) {
1138   GPR_ASSERT(
1139       service->server_ == nullptr &&
1140       "Can only register a callback generic service against one server.");
1141   service->server_ = this;
1142   has_callback_generic_service_ = true;
1143   generic_handler_.reset(service->Handler());
1144
1145   callback_unmatched_reqs_count_.push_back(0);
1146   auto method_index = callback_unmatched_reqs_count_.size() - 1;
1147   // TODO(vjpai): Register these dynamically based on need
1148   for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) {
1149     callback_reqs_to_start_.push_back(
1150         new CallbackRequest<grpc::GenericServerContext>(this, method_index,
1151                                                         nullptr, nullptr));
1152   }
1153 }
1154
1155 int Server::AddListeningPort(const grpc::string& addr,
1156                              grpc::ServerCredentials* creds) {
1157   GPR_ASSERT(!started_);
1158   int port = creds->AddPortToServer(addr, server_);
1159   global_callbacks_->AddPort(this, addr, creds, port);
1160   return port;
1161 }
1162
1163 void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
1164   GPR_ASSERT(!started_);
1165   global_callbacks_->PreServerStart(this);
1166   started_ = true;
1167
1168   // Only create default health check service when user did not provide an
1169   // explicit one.
1170   grpc::ServerCompletionQueue* health_check_cq = nullptr;
1171   grpc::DefaultHealthCheckService::HealthCheckServiceImpl*
1172       default_health_check_service_impl = nullptr;
1173   if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
1174       grpc::DefaultHealthCheckServiceEnabled()) {
1175     auto* default_hc_service = new grpc::DefaultHealthCheckService;
1176     health_check_service_.reset(default_hc_service);
1177     // We create a non-polling CQ to avoid impacting application
1178     // performance.  This ensures that we don't introduce thread hops
1179     // for application requests that wind up on this CQ, which is polled
1180     // in its own thread.
1181     health_check_cq = new grpc::ServerCompletionQueue(
1182         GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr);
1183     grpc_server_register_completion_queue(server_, health_check_cq->cq(),
1184                                           nullptr);
1185     default_health_check_service_impl =
1186         default_hc_service->GetHealthCheckService(
1187             std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq));
1188     RegisterService(nullptr, default_health_check_service_impl);
1189   }
1190
1191   // If this server uses callback methods, then create a callback generic
1192   // service to handle any unimplemented methods using the default reactor
1193   // creator
1194   if (!callback_reqs_to_start_.empty() && !has_callback_generic_service_) {
1195     unimplemented_service_.reset(
1196         new grpc::experimental::CallbackGenericService);
1197     RegisterCallbackGenericService(unimplemented_service_.get());
1198   }
1199
1200   grpc_server_start(server_);
1201
1202   if (!has_async_generic_service_ && !has_callback_generic_service_) {
1203     for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
1204       (*it)->AddUnknownSyncMethod();
1205     }
1206
1207     for (size_t i = 0; i < num_cqs; i++) {
1208       if (cqs[i]->IsFrequentlyPolled()) {
1209         new UnimplementedAsyncRequest(this, cqs[i]);
1210       }
1211     }
1212     if (health_check_cq != nullptr) {
1213       new UnimplementedAsyncRequest(this, health_check_cq);
1214     }
1215   }
1216
1217   // If this server has any support for synchronous methods (has any sync
1218   // server CQs), make sure that we have a ResourceExhausted handler
1219   // to deal with the case of thread exhaustion
1220   if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
1221     resource_exhausted_handler_.reset(
1222         new grpc::internal::ResourceExhaustedHandler);
1223   }
1224
1225   for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
1226     (*it)->Start();
1227   }
1228
1229   for (auto* cbreq : callback_reqs_to_start_) {
1230     GPR_ASSERT(cbreq->Request());
1231   }
1232   callback_reqs_to_start_.clear();
1233
1234   if (default_health_check_service_impl != nullptr) {
1235     default_health_check_service_impl->StartServingThread();
1236   }
1237 }
1238
1239 void Server::ShutdownInternal(gpr_timespec deadline) {
1240   grpc::internal::MutexLock lock(&mu_);
1241   if (shutdown_) {
1242     return;
1243   }
1244
1245   shutdown_ = true;
1246
1247   /// The completion queue to use for server shutdown completion notification
1248   grpc::CompletionQueue shutdown_cq;
1249   grpc::ShutdownTag shutdown_tag;  // Dummy shutdown tag
1250   grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
1251
1252   shutdown_cq.Shutdown();
1253
1254   void* tag;
1255   bool ok;
1256   grpc::CompletionQueue::NextStatus status =
1257       shutdown_cq.AsyncNext(&tag, &ok, deadline);
1258
1259   // If this timed out, it means we are done with the grace period for a clean
1260   // shutdown. We should force a shutdown now by cancelling all inflight calls
1261   if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) {
1262     grpc_server_cancel_all_calls(server_);
1263   }
1264   // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
1265   // successfully shutdown
1266
1267   // Shutdown all ThreadManagers. This will try to gracefully stop all the
1268   // threads in the ThreadManagers (once they process any inflight requests)
1269   for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
1270     (*it)->Shutdown();  // ThreadManager's Shutdown()
1271   }
1272
1273   // Wait for threads in all ThreadManagers to terminate
1274   for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
1275     (*it)->Wait();
1276   }
1277
1278   // Wait for all outstanding callback requests to complete
1279   // (whether waiting for a match or already active).
1280   // We know that no new requests will be created after this point
1281   // because they are only created at server startup time or when
1282   // we have a successful match on a request. During the shutdown phase,
1283   // requests that have not yet matched will be failed rather than
1284   // allowed to succeed, which will cause the server to delete the
1285   // request and decrement the count. Possibly a request will match before
1286   // the shutdown but then find that shutdown has already started by the
1287   // time it tries to register a new request. In that case, the registration
1288   // will report a failure, indicating a shutdown and again we won't end
1289   // up incrementing the counter.
1290   {
1291     grpc::internal::MutexLock cblock(&callback_reqs_mu_);
1292     callback_reqs_done_cv_.WaitUntil(
1293         &callback_reqs_mu_, [this] { return callback_reqs_outstanding_ == 0; });
1294   }
1295
1296   // Drain the shutdown queue (if the previous call to AsyncNext() timed out
1297   // and we didn't remove the tag from the queue yet)
1298   while (shutdown_cq.Next(&tag, &ok)) {
1299     // Nothing to be done here. Just ignore ok and tag values
1300   }
1301
1302   shutdown_notified_ = true;
1303   shutdown_cv_.Broadcast();
1304 }
1305
1306 void Server::Wait() {
1307   grpc::internal::MutexLock lock(&mu_);
1308   while (started_ && !shutdown_notified_) {
1309     shutdown_cv_.Wait(&mu_);
1310   }
1311 }
1312
1313 void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops,
1314                               grpc::internal::Call* call) {
1315   ops->FillOps(call);
1316 }
1317
1318 bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
1319                                                        bool* status) {
1320   if (GenericAsyncRequest::FinalizeResult(tag, status)) {
1321     // We either had no interceptors run or we are done intercepting
1322     if (*status) {
1323       new UnimplementedAsyncRequest(server_, cq_);
1324       new UnimplementedAsyncResponse(this);
1325     } else {
1326       delete this;
1327     }
1328   } else {
1329     // The tag was swallowed due to interception. We will see it again.
1330   }
1331   return false;
1332 }
1333
1334 Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
1335     UnimplementedAsyncRequest* request)
1336     : request_(request) {
1337   grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, "");
1338   grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this);
1339   request_->stream()->call_.PerformOps(this);
1340 }
1341
1342 grpc::ServerInitializer* Server::initializer() {
1343   return server_initializer_.get();
1344 }
1345
1346 grpc::CompletionQueue* Server::CallbackCQ() {
1347   // TODO(vjpai): Consider using a single global CQ for the default CQ
1348   // if there is no explicit per-server CQ registered
1349   grpc::internal::MutexLock l(&mu_);
1350   if (callback_cq_ == nullptr) {
1351     auto* shutdown_callback = new grpc::ShutdownCallback;
1352     callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
1353         GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
1354         shutdown_callback});
1355
1356     // Transfer ownership of the new cq to its own shutdown callback
1357     shutdown_callback->TakeCQ(callback_cq_);
1358   }
1359   return callback_cq_;
1360 }
1361
1362 }  // namespace grpc_impl