Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / src / core / lib / surface / server.cc
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/surface/server.h"
22
23 #include <limits.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30
31 #include <utility>
32
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/channel/connected_channel.h"
35 #include "src/core/lib/debug/stats.h"
36 #include "src/core/lib/gpr/mpscq.h"
37 #include "src/core/lib/gpr/spinlock.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/iomgr/executor.h"
40 #include "src/core/lib/iomgr/iomgr.h"
41 #include "src/core/lib/slice/slice_internal.h"
42 #include "src/core/lib/surface/api_trace.h"
43 #include "src/core/lib/surface/call.h"
44 #include "src/core/lib/surface/channel.h"
45 #include "src/core/lib/surface/completion_queue.h"
46 #include "src/core/lib/surface/init.h"
47 #include "src/core/lib/transport/metadata.h"
48 #include "src/core/lib/transport/static_metadata.h"
49
50 grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel");
51
52 static void server_on_recv_initial_metadata(void* ptr, grpc_error* error);
53 static void server_recv_trailing_metadata_ready(void* user_data,
54                                                 grpc_error* error);
55
56 namespace {
57 struct listener {
58   void* arg;
59   void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
60                 size_t pollset_count);
61   void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure);
62   struct listener* next;
63   intptr_t socket_uuid;
64   grpc_closure destroy_done;
65 };
66
67 enum requested_call_type { BATCH_CALL, REGISTERED_CALL };
68
69 struct registered_method;
70
71 struct requested_call {
72   gpr_mpscq_node request_link; /* must be first */
73   requested_call_type type;
74   size_t cq_idx;
75   void* tag;
76   grpc_server* server;
77   grpc_completion_queue* cq_bound_to_call;
78   grpc_call** call;
79   grpc_cq_completion completion;
80   grpc_metadata_array* initial_metadata;
81   union {
82     struct {
83       grpc_call_details* details;
84     } batch;
85     struct {
86       registered_method* method;
87       gpr_timespec* deadline;
88       grpc_byte_buffer** optional_payload;
89     } registered;
90   } data;
91 };
92
93 struct channel_registered_method {
94   registered_method* server_registered_method;
95   uint32_t flags;
96   bool has_host;
97   grpc_slice method;
98   grpc_slice host;
99 };
100
101 struct channel_data {
102   grpc_server* server;
103   grpc_connectivity_state connectivity_state;
104   grpc_channel* channel;
105   size_t cq_idx;
106   /* linked list of all channels on a server */
107   channel_data* next;
108   channel_data* prev;
109   channel_registered_method* registered_methods;
110   uint32_t registered_method_slots;
111   uint32_t registered_method_max_probes;
112   grpc_closure finish_destroy_channel_closure;
113   grpc_closure channel_connectivity_changed;
114   grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node;
115 };
116
117 typedef struct shutdown_tag {
118   void* tag;
119   grpc_completion_queue* cq;
120   grpc_cq_completion completion;
121 } shutdown_tag;
122
123 typedef enum {
124   /* waiting for metadata */
125   NOT_STARTED,
126   /* initial metadata read, not flow controlled in yet */
127   PENDING,
128   /* flow controlled in, on completion queue */
129   ACTIVATED,
130   /* cancelled before being queued */
131   ZOMBIED
132 } call_state;
133
134 typedef struct request_matcher request_matcher;
135
136 struct call_data {
137   call_data(grpc_call_element* elem, const grpc_call_element_args& args)
138       : call(grpc_call_from_top_element(elem)),
139         call_combiner(args.call_combiner) {
140     GRPC_CLOSURE_INIT(&server_on_recv_initial_metadata,
141                       ::server_on_recv_initial_metadata, elem,
142                       grpc_schedule_on_exec_ctx);
143     GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
144                       server_recv_trailing_metadata_ready, elem,
145                       grpc_schedule_on_exec_ctx);
146   }
147   ~call_data() {
148     GPR_ASSERT(state != PENDING);
149     GRPC_ERROR_UNREF(recv_initial_metadata_error);
150     if (host_set) {
151       grpc_slice_unref_internal(host);
152     }
153     if (path_set) {
154       grpc_slice_unref_internal(path);
155     }
156     grpc_metadata_array_destroy(&initial_metadata);
157     grpc_byte_buffer_destroy(payload);
158   }
159
160   grpc_call* call;
161
162   gpr_atm state = NOT_STARTED;
163
164   bool path_set = false;
165   bool host_set = false;
166   grpc_slice path;
167   grpc_slice host;
168   grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
169
170   grpc_completion_queue* cq_new = nullptr;
171
172   grpc_metadata_batch* recv_initial_metadata = nullptr;
173   uint32_t recv_initial_metadata_flags = 0;
174   grpc_metadata_array initial_metadata =
175       grpc_metadata_array();  // Zero-initialize the C struct.
176
177   request_matcher* matcher = nullptr;
178   grpc_byte_buffer* payload = nullptr;
179
180   grpc_closure got_initial_metadata;
181   grpc_closure server_on_recv_initial_metadata;
182   grpc_closure kill_zombie_closure;
183   grpc_closure* on_done_recv_initial_metadata;
184   grpc_closure recv_trailing_metadata_ready;
185   grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
186   grpc_closure* original_recv_trailing_metadata_ready;
187   grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE;
188   bool seen_recv_trailing_metadata_ready = false;
189
190   grpc_closure publish;
191
192   call_data* pending_next = nullptr;
193   grpc_core::CallCombiner* call_combiner;
194 };
195
196 struct request_matcher {
197   grpc_server* server;
198   call_data* pending_head;
199   call_data* pending_tail;
200   gpr_locked_mpscq* requests_per_cq;
201 };
202
203 struct registered_method {
204   char* method;
205   char* host;
206   grpc_server_register_method_payload_handling payload_handling;
207   uint32_t flags;
208   /* one request matcher per method */
209   request_matcher matcher;
210   registered_method* next;
211 };
212
213 typedef struct {
214   grpc_channel** channels;
215   size_t num_channels;
216 } channel_broadcaster;
217 }  // namespace
218
219 struct grpc_server {
220   grpc_channel_args* channel_args;
221
222   grpc_resource_user* default_resource_user;
223
224   grpc_completion_queue** cqs;
225   grpc_pollset** pollsets;
226   size_t cq_count;
227   size_t pollset_count;
228   bool started;
229
230   /* The two following mutexes control access to server-state
231      mu_global controls access to non-call-related state (e.g., channel state)
232      mu_call controls access to call-related state (e.g., the call lists)
233
234      If they are ever required to be nested, you must lock mu_global
235      before mu_call. This is currently used in shutdown processing
236      (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
237   gpr_mu mu_global; /* mutex for server and channel state */
238   gpr_mu mu_call;   /* mutex for call-specific state */
239
240   /* startup synchronization: flag is protected by mu_global, signals whether
241      we are doing the listener start routine or not */
242   bool starting;
243   gpr_cv starting_cv;
244
245   registered_method* registered_methods;
246   /** one request matcher for unregistered methods */
247   request_matcher unregistered_request_matcher;
248
249   gpr_atm shutdown_flag;
250   uint8_t shutdown_published;
251   size_t num_shutdown_tags;
252   shutdown_tag* shutdown_tags;
253
254   channel_data root_channel_data;
255
256   listener* listeners;
257   int listeners_destroyed;
258   gpr_refcount internal_refcount;
259
260   /** when did we print the last shutdown progress message */
261   gpr_timespec last_shutdown_message_time;
262
263   grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
264 };
265
266 #define SERVER_FROM_CALL_ELEM(elem) \
267   (((channel_data*)(elem)->channel_data)->server)
268
269 static void publish_new_rpc(void* calld, grpc_error* error);
270 static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
271                       grpc_error* error);
272 /* Before calling maybe_finish_shutdown, we must hold mu_global and not
273    hold mu_call */
274 static void maybe_finish_shutdown(grpc_server* server);
275
276 /*
277  * channel broadcaster
278  */
279
280 /* assumes server locked */
281 static void channel_broadcaster_init(grpc_server* s, channel_broadcaster* cb) {
282   channel_data* c;
283   size_t count = 0;
284   for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
285     count++;
286   }
287   cb->num_channels = count;
288   cb->channels = static_cast<grpc_channel**>(
289       gpr_malloc(sizeof(*cb->channels) * cb->num_channels));
290   count = 0;
291   for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
292     cb->channels[count++] = c->channel;
293     GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
294   }
295 }
296
297 struct shutdown_cleanup_args {
298   grpc_closure closure;
299   grpc_slice slice;
300 };
301
302 static void shutdown_cleanup(void* arg, grpc_error* error) {
303   struct shutdown_cleanup_args* a =
304       static_cast<struct shutdown_cleanup_args*>(arg);
305   grpc_slice_unref_internal(a->slice);
306   gpr_free(a);
307 }
308
309 static void send_shutdown(grpc_channel* channel, bool send_goaway,
310                           grpc_error* send_disconnect) {
311   struct shutdown_cleanup_args* sc =
312       static_cast<struct shutdown_cleanup_args*>(gpr_malloc(sizeof(*sc)));
313   GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
314                     grpc_schedule_on_exec_ctx);
315   grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
316   grpc_channel_element* elem;
317
318   op->goaway_error =
319       send_goaway ? grpc_error_set_int(
320                         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"),
321                         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK)
322                   : GRPC_ERROR_NONE;
323   op->set_accept_stream = true;
324   sc->slice = grpc_slice_from_copied_string("Server shutdown");
325   op->disconnect_with_error = send_disconnect;
326
327   elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
328   elem->filter->start_transport_op(elem, op);
329 }
330
331 static void channel_broadcaster_shutdown(channel_broadcaster* cb,
332                                          bool send_goaway,
333                                          grpc_error* force_disconnect) {
334   size_t i;
335
336   for (i = 0; i < cb->num_channels; i++) {
337     send_shutdown(cb->channels[i], send_goaway,
338                   GRPC_ERROR_REF(force_disconnect));
339     GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
340   }
341   gpr_free(cb->channels);
342   GRPC_ERROR_UNREF(force_disconnect);
343 }
344
345 /*
346  * request_matcher
347  */
348
349 static void request_matcher_init(request_matcher* rm, grpc_server* server) {
350   rm->server = server;
351   rm->pending_head = rm->pending_tail = nullptr;
352   rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(
353       gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
354   for (size_t i = 0; i < server->cq_count; i++) {
355     gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
356   }
357 }
358
359 static void request_matcher_destroy(request_matcher* rm) {
360   for (size_t i = 0; i < rm->server->cq_count; i++) {
361     GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);
362     gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
363   }
364   gpr_free(rm->requests_per_cq);
365 }
366
367 static void kill_zombie(void* elem, grpc_error* error) {
368   grpc_call_unref(
369       grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
370 }
371
372 static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
373   while (rm->pending_head) {
374     call_data* calld = rm->pending_head;
375     rm->pending_head = calld->pending_next;
376     gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
377     GRPC_CLOSURE_INIT(
378         &calld->kill_zombie_closure, kill_zombie,
379         grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
380         grpc_schedule_on_exec_ctx);
381     GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
382   }
383 }
384
385 static void request_matcher_kill_requests(grpc_server* server,
386                                           request_matcher* rm,
387                                           grpc_error* error) {
388   requested_call* rc;
389   for (size_t i = 0; i < server->cq_count; i++) {
390     while ((rc = reinterpret_cast<requested_call*>(
391                 gpr_locked_mpscq_pop(&rm->requests_per_cq[i]))) != nullptr) {
392       fail_call(server, i, rc, GRPC_ERROR_REF(error));
393     }
394   }
395   GRPC_ERROR_UNREF(error);
396 }
397
398 /*
399  * server proper
400  */
401
402 static void server_ref(grpc_server* server) {
403   gpr_ref(&server->internal_refcount);
404 }
405
406 static void server_delete(grpc_server* server) {
407   registered_method* rm;
408   size_t i;
409   server->channelz_server.reset();
410   grpc_channel_args_destroy(server->channel_args);
411   gpr_mu_destroy(&server->mu_global);
412   gpr_mu_destroy(&server->mu_call);
413   gpr_cv_destroy(&server->starting_cv);
414   while ((rm = server->registered_methods) != nullptr) {
415     server->registered_methods = rm->next;
416     if (server->started) {
417       request_matcher_destroy(&rm->matcher);
418     }
419     gpr_free(rm->method);
420     gpr_free(rm->host);
421     gpr_free(rm);
422   }
423   if (server->started) {
424     request_matcher_destroy(&server->unregistered_request_matcher);
425   }
426   for (i = 0; i < server->cq_count; i++) {
427     GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
428   }
429   gpr_free(server->cqs);
430   gpr_free(server->pollsets);
431   gpr_free(server->shutdown_tags);
432   gpr_free(server);
433 }
434
435 static void server_unref(grpc_server* server) {
436   if (gpr_unref(&server->internal_refcount)) {
437     server_delete(server);
438   }
439 }
440
441 static int is_channel_orphaned(channel_data* chand) {
442   return chand->next == chand;
443 }
444
445 static void orphan_channel(channel_data* chand) {
446   chand->next->prev = chand->prev;
447   chand->prev->next = chand->next;
448   chand->next = chand->prev = chand;
449 }
450
451 static void finish_destroy_channel(void* cd, grpc_error* error) {
452   channel_data* chand = static_cast<channel_data*>(cd);
453   grpc_server* server = chand->server;
454   GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
455   server_unref(server);
456 }
457
458 static void destroy_channel(channel_data* chand, grpc_error* error) {
459   if (is_channel_orphaned(chand)) return;
460   GPR_ASSERT(chand->server != nullptr);
461   orphan_channel(chand);
462   server_ref(chand->server);
463   maybe_finish_shutdown(chand->server);
464   GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
465                     finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
466
467   if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace) &&
468       error != GRPC_ERROR_NONE) {
469     const char* msg = grpc_error_string(error);
470     gpr_log(GPR_INFO, "Disconnected client: %s", msg);
471   }
472   GRPC_ERROR_UNREF(error);
473
474   grpc_transport_op* op =
475       grpc_make_transport_op(&chand->finish_destroy_channel_closure);
476   op->set_accept_stream = true;
477   grpc_channel_next_op(grpc_channel_stack_element(
478                            grpc_channel_get_channel_stack(chand->channel), 0),
479                        op);
480 }
481
482 static void done_request_event(void* req, grpc_cq_completion* c) {
483   gpr_free(req);
484 }
485
486 static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
487                          requested_call* rc) {
488   grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
489   grpc_call* call = calld->call;
490   *rc->call = call;
491   calld->cq_new = server->cqs[cq_idx];
492   GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
493   switch (rc->type) {
494     case BATCH_CALL:
495       GPR_ASSERT(calld->host_set);
496       GPR_ASSERT(calld->path_set);
497       rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
498       rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
499       rc->data.batch.details->deadline =
500           grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
501       rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
502       break;
503     case REGISTERED_CALL:
504       *rc->data.registered.deadline =
505           grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
506       if (rc->data.registered.optional_payload) {
507         *rc->data.registered.optional_payload = calld->payload;
508         calld->payload = nullptr;
509       }
510       break;
511     default:
512       GPR_UNREACHABLE_CODE(return );
513   }
514
515   grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
516                  rc, &rc->completion);
517 }
518
519 static void publish_new_rpc(void* arg, grpc_error* error) {
520   grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
521   call_data* calld = static_cast<call_data*>(call_elem->call_data);
522   channel_data* chand = static_cast<channel_data*>(call_elem->channel_data);
523   request_matcher* rm = calld->matcher;
524   grpc_server* server = rm->server;
525
526   if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
527     gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
528     GRPC_CLOSURE_INIT(
529         &calld->kill_zombie_closure, kill_zombie,
530         grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
531         grpc_schedule_on_exec_ctx);
532     GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error));
533     return;
534   }
535
536   for (size_t i = 0; i < server->cq_count; i++) {
537     size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
538     requested_call* rc = reinterpret_cast<requested_call*>(
539         gpr_locked_mpscq_try_pop(&rm->requests_per_cq[cq_idx]));
540     if (rc == nullptr) {
541       continue;
542     } else {
543       GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
544       gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
545       publish_call(server, calld, cq_idx, rc);
546       return; /* early out */
547     }
548   }
549
550   /* no cq to take the request found: queue it on the slow list */
551   GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
552   gpr_mu_lock(&server->mu_call);
553
554   // We need to ensure that all the queues are empty.  We do this under
555   // the server mu_call lock to ensure that if something is added to
556   // an empty request queue, it will block until the call is actually
557   // added to the pending list.
558   for (size_t i = 0; i < server->cq_count; i++) {
559     size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
560     requested_call* rc = reinterpret_cast<requested_call*>(
561         gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
562     if (rc == nullptr) {
563       continue;
564     } else {
565       gpr_mu_unlock(&server->mu_call);
566       GRPC_STATS_INC_SERVER_CQS_CHECKED(i + server->cq_count);
567       gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
568       publish_call(server, calld, cq_idx, rc);
569       return; /* early out */
570     }
571   }
572
573   gpr_atm_no_barrier_store(&calld->state, PENDING);
574   if (rm->pending_head == nullptr) {
575     rm->pending_tail = rm->pending_head = calld;
576   } else {
577     rm->pending_tail->pending_next = calld;
578     rm->pending_tail = calld;
579   }
580   calld->pending_next = nullptr;
581   gpr_mu_unlock(&server->mu_call);
582 }
583
584 static void finish_start_new_rpc(
585     grpc_server* server, grpc_call_element* elem, request_matcher* rm,
586     grpc_server_register_method_payload_handling payload_handling) {
587   call_data* calld = static_cast<call_data*>(elem->call_data);
588
589   if (gpr_atm_acq_load(&server->shutdown_flag)) {
590     gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
591     GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
592                       grpc_schedule_on_exec_ctx);
593     GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
594     return;
595   }
596
597   calld->matcher = rm;
598
599   switch (payload_handling) {
600     case GRPC_SRM_PAYLOAD_NONE:
601       publish_new_rpc(elem, GRPC_ERROR_NONE);
602       break;
603     case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
604       grpc_op op;
605       op.op = GRPC_OP_RECV_MESSAGE;
606       op.flags = 0;
607       op.reserved = nullptr;
608       op.data.recv_message.recv_message = &calld->payload;
609       GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,
610                         grpc_schedule_on_exec_ctx);
611       grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish);
612       break;
613     }
614   }
615 }
616
617 static void start_new_rpc(grpc_call_element* elem) {
618   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
619   call_data* calld = static_cast<call_data*>(elem->call_data);
620   grpc_server* server = chand->server;
621   uint32_t i;
622   uint32_t hash;
623   channel_registered_method* rm;
624
625   if (chand->registered_methods && calld->path_set && calld->host_set) {
626     /* TODO(ctiller): unify these two searches */
627     /* check for an exact match with host */
628     hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash(calld->host),
629                               grpc_slice_hash(calld->path));
630     for (i = 0; i <= chand->registered_method_max_probes; i++) {
631       rm = &chand->registered_methods[(hash + i) %
632                                       chand->registered_method_slots];
633       if (!rm) break;
634       if (!rm->has_host) continue;
635       if (!grpc_slice_eq(rm->host, calld->host)) continue;
636       if (!grpc_slice_eq(rm->method, calld->path)) continue;
637       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
638           0 == (calld->recv_initial_metadata_flags &
639                 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
640         continue;
641       }
642       finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
643                            rm->server_registered_method->payload_handling);
644       return;
645     }
646     /* check for a wildcard method definition (no host set) */
647     hash = GRPC_MDSTR_KV_HASH(0, grpc_slice_hash(calld->path));
648     for (i = 0; i <= chand->registered_method_max_probes; i++) {
649       rm = &chand->registered_methods[(hash + i) %
650                                       chand->registered_method_slots];
651       if (!rm) break;
652       if (rm->has_host) continue;
653       if (!grpc_slice_eq(rm->method, calld->path)) continue;
654       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
655           0 == (calld->recv_initial_metadata_flags &
656                 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
657         continue;
658       }
659       finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
660                            rm->server_registered_method->payload_handling);
661       return;
662     }
663   }
664   finish_start_new_rpc(server, elem, &server->unregistered_request_matcher,
665                        GRPC_SRM_PAYLOAD_NONE);
666 }
667
668 static int num_listeners(grpc_server* server) {
669   listener* l;
670   int n = 0;
671   for (l = server->listeners; l; l = l->next) {
672     n++;
673   }
674   return n;
675 }
676
677 static void done_shutdown_event(void* server, grpc_cq_completion* completion) {
678   server_unref(static_cast<grpc_server*>(server));
679 }
680
681 static int num_channels(grpc_server* server) {
682   channel_data* chand;
683   int n = 0;
684   for (chand = server->root_channel_data.next;
685        chand != &server->root_channel_data; chand = chand->next) {
686     n++;
687   }
688   return n;
689 }
690
691 static void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
692   if (server->started) {
693     request_matcher_kill_requests(server, &server->unregistered_request_matcher,
694                                   GRPC_ERROR_REF(error));
695     request_matcher_zombify_all_pending_calls(
696         &server->unregistered_request_matcher);
697     for (registered_method* rm = server->registered_methods; rm;
698          rm = rm->next) {
699       request_matcher_kill_requests(server, &rm->matcher,
700                                     GRPC_ERROR_REF(error));
701       request_matcher_zombify_all_pending_calls(&rm->matcher);
702     }
703   }
704   GRPC_ERROR_UNREF(error);
705 }
706
707 static void maybe_finish_shutdown(grpc_server* server) {
708   size_t i;
709   if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
710     return;
711   }
712
713   kill_pending_work_locked(
714       server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
715
716   if (server->root_channel_data.next != &server->root_channel_data ||
717       server->listeners_destroyed < num_listeners(server)) {
718     if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
719                                   server->last_shutdown_message_time),
720                      gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
721       server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
722       gpr_log(GPR_DEBUG,
723               "Waiting for %d channels and %d/%d listeners to be destroyed"
724               " before shutting down server",
725               num_channels(server),
726               num_listeners(server) - server->listeners_destroyed,
727               num_listeners(server));
728     }
729     return;
730   }
731   server->shutdown_published = 1;
732   for (i = 0; i < server->num_shutdown_tags; i++) {
733     server_ref(server);
734     grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
735                    GRPC_ERROR_NONE, done_shutdown_event, server,
736                    &server->shutdown_tags[i].completion);
737   }
738 }
739
740 static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
741   grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
742   call_data* calld = static_cast<call_data*>(elem->call_data);
743   grpc_millis op_deadline;
744
745   if (error == GRPC_ERROR_NONE) {
746     GPR_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);
747     GPR_ASSERT(calld->recv_initial_metadata->idx.named.authority != nullptr);
748     calld->path = grpc_slice_ref_internal(
749         GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
750     calld->host = grpc_slice_ref_internal(
751         GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));
752     calld->path_set = true;
753     calld->host_set = true;
754     grpc_metadata_batch_remove(calld->recv_initial_metadata,
755                                calld->recv_initial_metadata->idx.named.path);
756     grpc_metadata_batch_remove(
757         calld->recv_initial_metadata,
758         calld->recv_initial_metadata->idx.named.authority);
759   } else {
760     GRPC_ERROR_REF(error);
761   }
762   op_deadline = calld->recv_initial_metadata->deadline;
763   if (op_deadline != GRPC_MILLIS_INF_FUTURE) {
764     calld->deadline = op_deadline;
765   }
766   if (calld->host_set && calld->path_set) {
767     /* do nothing */
768   } else {
769     /* Pass the error reference to calld->recv_initial_metadata_error */
770     grpc_error* src_error = error;
771     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
772         "Missing :authority or :path", &src_error, 1);
773     GRPC_ERROR_UNREF(src_error);
774     calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
775   }
776   grpc_closure* closure = calld->on_done_recv_initial_metadata;
777   calld->on_done_recv_initial_metadata = nullptr;
778   if (calld->seen_recv_trailing_metadata_ready) {
779     GRPC_CALL_COMBINER_START(calld->call_combiner,
780                              &calld->recv_trailing_metadata_ready,
781                              calld->recv_trailing_metadata_error,
782                              "continue server_recv_trailing_metadata_ready");
783   }
784   GRPC_CLOSURE_RUN(closure, error);
785 }
786
787 static void server_recv_trailing_metadata_ready(void* user_data,
788                                                 grpc_error* error) {
789   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
790   call_data* calld = static_cast<call_data*>(elem->call_data);
791   if (calld->on_done_recv_initial_metadata != nullptr) {
792     calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
793     calld->seen_recv_trailing_metadata_ready = true;
794     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
795                       server_recv_trailing_metadata_ready, elem,
796                       grpc_schedule_on_exec_ctx);
797     GRPC_CALL_COMBINER_STOP(calld->call_combiner,
798                             "deferring server_recv_trailing_metadata_ready "
799                             "until after server_on_recv_initial_metadata");
800     return;
801   }
802   error =
803       grpc_error_add_child(GRPC_ERROR_REF(error),
804                            GRPC_ERROR_REF(calld->recv_initial_metadata_error));
805   GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
806 }
807
808 static void server_mutate_op(grpc_call_element* elem,
809                              grpc_transport_stream_op_batch* op) {
810   call_data* calld = static_cast<call_data*>(elem->call_data);
811
812   if (op->recv_initial_metadata) {
813     GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == nullptr);
814     calld->recv_initial_metadata =
815         op->payload->recv_initial_metadata.recv_initial_metadata;
816     calld->on_done_recv_initial_metadata =
817         op->payload->recv_initial_metadata.recv_initial_metadata_ready;
818     op->payload->recv_initial_metadata.recv_initial_metadata_ready =
819         &calld->server_on_recv_initial_metadata;
820     op->payload->recv_initial_metadata.recv_flags =
821         &calld->recv_initial_metadata_flags;
822   }
823   if (op->recv_trailing_metadata) {
824     calld->original_recv_trailing_metadata_ready =
825         op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
826     op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
827         &calld->recv_trailing_metadata_ready;
828   }
829 }
830
831 static void server_start_transport_stream_op_batch(
832     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
833   server_mutate_op(elem, op);
834   grpc_call_next_op(elem, op);
835 }
836
837 static void got_initial_metadata(void* ptr, grpc_error* error) {
838   grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
839   call_data* calld = static_cast<call_data*>(elem->call_data);
840   if (error == GRPC_ERROR_NONE) {
841     start_new_rpc(elem);
842   } else {
843     if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
844       GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
845                         grpc_schedule_on_exec_ctx);
846       GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
847     } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
848       /* zombied call will be destroyed when it's removed from the pending
849          queue... later */
850     }
851   }
852 }
853
854 static void accept_stream(void* cd, grpc_transport* transport,
855                           const void* transport_server_data) {
856   channel_data* chand = static_cast<channel_data*>(cd);
857   /* create a call */
858   grpc_call_create_args args;
859   args.channel = chand->channel;
860   args.server = chand->server;
861   args.parent = nullptr;
862   args.propagation_mask = 0;
863   args.cq = nullptr;
864   args.pollset_set_alternative = nullptr;
865   args.server_transport_data = transport_server_data;
866   args.add_initial_metadata = nullptr;
867   args.add_initial_metadata_count = 0;
868   args.send_deadline = GRPC_MILLIS_INF_FUTURE;
869   grpc_call* call;
870   grpc_error* error = grpc_call_create(&args, &call);
871   grpc_call_element* elem =
872       grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
873   if (error != GRPC_ERROR_NONE) {
874     got_initial_metadata(elem, error);
875     GRPC_ERROR_UNREF(error);
876     return;
877   }
878   call_data* calld = static_cast<call_data*>(elem->call_data);
879   grpc_op op;
880   op.op = GRPC_OP_RECV_INITIAL_METADATA;
881   op.flags = 0;
882   op.reserved = nullptr;
883   op.data.recv_initial_metadata.recv_initial_metadata =
884       &calld->initial_metadata;
885   GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
886                     grpc_schedule_on_exec_ctx);
887   grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
888 }
889
890 static void channel_connectivity_changed(void* cd, grpc_error* error) {
891   channel_data* chand = static_cast<channel_data*>(cd);
892   grpc_server* server = chand->server;
893   if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
894     grpc_transport_op* op = grpc_make_transport_op(nullptr);
895     op->on_connectivity_state_change = &chand->channel_connectivity_changed;
896     op->connectivity_state = &chand->connectivity_state;
897     grpc_channel_next_op(grpc_channel_stack_element(
898                              grpc_channel_get_channel_stack(chand->channel), 0),
899                          op);
900   } else {
901     gpr_mu_lock(&server->mu_global);
902     destroy_channel(chand, GRPC_ERROR_REF(error));
903     gpr_mu_unlock(&server->mu_global);
904     GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
905   }
906 }
907
908 static grpc_error* init_call_elem(grpc_call_element* elem,
909                                   const grpc_call_element_args* args) {
910   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
911   server_ref(chand->server);
912   new (elem->call_data) call_data(elem, *args);
913   return GRPC_ERROR_NONE;
914 }
915
916 static void destroy_call_elem(grpc_call_element* elem,
917                               const grpc_call_final_info* final_info,
918                               grpc_closure* ignored) {
919   call_data* calld = static_cast<call_data*>(elem->call_data);
920   calld->~call_data();
921   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
922   server_unref(chand->server);
923 }
924
925 static grpc_error* init_channel_elem(grpc_channel_element* elem,
926                                      grpc_channel_element_args* args) {
927   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
928   GPR_ASSERT(args->is_first);
929   GPR_ASSERT(!args->is_last);
930   chand->server = nullptr;
931   chand->channel = nullptr;
932   chand->next = chand->prev = chand;
933   chand->registered_methods = nullptr;
934   chand->connectivity_state = GRPC_CHANNEL_IDLE;
935   GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed,
936                     channel_connectivity_changed, chand,
937                     grpc_schedule_on_exec_ctx);
938   return GRPC_ERROR_NONE;
939 }
940
941 static void destroy_channel_elem(grpc_channel_element* elem) {
942   size_t i;
943   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
944   chand->socket_node.reset();
945   if (chand->registered_methods) {
946     for (i = 0; i < chand->registered_method_slots; i++) {
947       grpc_slice_unref_internal(chand->registered_methods[i].method);
948       if (chand->registered_methods[i].has_host) {
949         grpc_slice_unref_internal(chand->registered_methods[i].host);
950       }
951     }
952     gpr_free(chand->registered_methods);
953   }
954   if (chand->server) {
955     gpr_mu_lock(&chand->server->mu_global);
956     chand->next->prev = chand->prev;
957     chand->prev->next = chand->next;
958     chand->next = chand->prev = chand;
959     maybe_finish_shutdown(chand->server);
960     gpr_mu_unlock(&chand->server->mu_global);
961     server_unref(chand->server);
962   }
963 }
964
965 const grpc_channel_filter grpc_server_top_filter = {
966     server_start_transport_stream_op_batch,
967     grpc_channel_next_op,
968     sizeof(call_data),
969     init_call_elem,
970     grpc_call_stack_ignore_set_pollset_or_pollset_set,
971     destroy_call_elem,
972     sizeof(channel_data),
973     init_channel_elem,
974     destroy_channel_elem,
975     grpc_channel_next_get_info,
976     "server",
977 };
978
979 static void register_completion_queue(grpc_server* server,
980                                       grpc_completion_queue* cq,
981                                       void* reserved) {
982   size_t i, n;
983   GPR_ASSERT(!reserved);
984   for (i = 0; i < server->cq_count; i++) {
985     if (server->cqs[i] == cq) return;
986   }
987
988   GRPC_CQ_INTERNAL_REF(cq, "server");
989   n = server->cq_count++;
990   server->cqs = static_cast<grpc_completion_queue**>(gpr_realloc(
991       server->cqs, server->cq_count * sizeof(grpc_completion_queue*)));
992   server->cqs[n] = cq;
993 }
994
995 void grpc_server_register_completion_queue(grpc_server* server,
996                                            grpc_completion_queue* cq,
997                                            void* reserved) {
998   GRPC_API_TRACE(
999       "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
1000       (server, cq, reserved));
1001
1002   auto cq_type = grpc_get_cq_completion_type(cq);
1003   if (cq_type != GRPC_CQ_NEXT && cq_type != GRPC_CQ_CALLBACK) {
1004     gpr_log(GPR_INFO,
1005             "Completion queue of type %d is being registered as a "
1006             "server-completion-queue",
1007             static_cast<int>(cq_type));
1008     /* Ideally we should log an error and abort but ruby-wrapped-language API
1009        calls grpc_completion_queue_pluck() on server completion queues */
1010   }
1011
1012   register_completion_queue(server, cq, reserved);
1013 }
1014
1015 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
1016   grpc_core::ExecCtx exec_ctx;
1017   GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
1018
1019   grpc_server* server =
1020       static_cast<grpc_server*>(gpr_zalloc(sizeof(grpc_server)));
1021
1022   gpr_mu_init(&server->mu_global);
1023   gpr_mu_init(&server->mu_call);
1024   gpr_cv_init(&server->starting_cv);
1025
1026   /* decremented by grpc_server_destroy */
1027   gpr_ref_init(&server->internal_refcount, 1);
1028   server->root_channel_data.next = server->root_channel_data.prev =
1029       &server->root_channel_data;
1030
1031   server->channel_args = grpc_channel_args_copy(args);
1032
1033   const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
1034   if (grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT)) {
1035     arg = grpc_channel_args_find(
1036         args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE);
1037     size_t channel_tracer_max_memory = grpc_channel_arg_get_integer(
1038         arg,
1039         {GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});
1040     server->channelz_server =
1041         grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
1042             server, channel_tracer_max_memory);
1043     server->channelz_server->AddTraceEvent(
1044         grpc_core::channelz::ChannelTrace::Severity::Info,
1045         grpc_slice_from_static_string("Server created"));
1046   }
1047
1048   if (args != nullptr) {
1049     grpc_resource_quota* resource_quota =
1050         grpc_resource_quota_from_channel_args(args, false /* create */);
1051     if (resource_quota != nullptr) {
1052       server->default_resource_user =
1053           grpc_resource_user_create(resource_quota, "default");
1054     }
1055   }
1056
1057   return server;
1058 }
1059
1060 static int streq(const char* a, const char* b) {
1061   if (a == nullptr && b == nullptr) return 1;
1062   if (a == nullptr) return 0;
1063   if (b == nullptr) return 0;
1064   return 0 == strcmp(a, b);
1065 }
1066
1067 void* grpc_server_register_method(
1068     grpc_server* server, const char* method, const char* host,
1069     grpc_server_register_method_payload_handling payload_handling,
1070     uint32_t flags) {
1071   registered_method* m;
1072   GRPC_API_TRACE(
1073       "grpc_server_register_method(server=%p, method=%s, host=%s, "
1074       "flags=0x%08x)",
1075       4, (server, method, host, flags));
1076   if (!method) {
1077     gpr_log(GPR_ERROR,
1078             "grpc_server_register_method method string cannot be NULL");
1079     return nullptr;
1080   }
1081   for (m = server->registered_methods; m; m = m->next) {
1082     if (streq(m->method, method) && streq(m->host, host)) {
1083       gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
1084               host ? host : "*");
1085       return nullptr;
1086     }
1087   }
1088   if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) {
1089     gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
1090             flags);
1091     return nullptr;
1092   }
1093   m = static_cast<registered_method*>(gpr_zalloc(sizeof(registered_method)));
1094   m->method = gpr_strdup(method);
1095   m->host = gpr_strdup(host);
1096   m->next = server->registered_methods;
1097   m->payload_handling = payload_handling;
1098   m->flags = flags;
1099   server->registered_methods = m;
1100   return m;
1101 }
1102
1103 void grpc_server_start(grpc_server* server) {
1104   size_t i;
1105   grpc_core::ExecCtx exec_ctx;
1106
1107   GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
1108
1109   server->started = true;
1110   server->pollset_count = 0;
1111   server->pollsets = static_cast<grpc_pollset**>(
1112       gpr_malloc(sizeof(grpc_pollset*) * server->cq_count));
1113   for (i = 0; i < server->cq_count; i++) {
1114     if (grpc_cq_can_listen(server->cqs[i])) {
1115       server->pollsets[server->pollset_count++] =
1116           grpc_cq_pollset(server->cqs[i]);
1117     }
1118   }
1119   request_matcher_init(&server->unregistered_request_matcher, server);
1120   for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
1121     request_matcher_init(&rm->matcher, server);
1122   }
1123
1124   gpr_mu_lock(&server->mu_global);
1125   server->starting = true;
1126   gpr_mu_unlock(&server->mu_global);
1127
1128   for (listener* l = server->listeners; l; l = l->next) {
1129     l->start(server, l->arg, server->pollsets, server->pollset_count);
1130   }
1131
1132   gpr_mu_lock(&server->mu_global);
1133   server->starting = false;
1134   gpr_cv_signal(&server->starting_cv);
1135   gpr_mu_unlock(&server->mu_global);
1136 }
1137
1138 void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
1139                               size_t* pollset_count) {
1140   *pollset_count = server->pollset_count;
1141   *pollsets = server->pollsets;
1142 }
1143
1144 void grpc_server_setup_transport(
1145     grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
1146     const grpc_channel_args* args,
1147     grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node,
1148     grpc_resource_user* resource_user) {
1149   size_t num_registered_methods;
1150   size_t alloc;
1151   registered_method* rm;
1152   channel_registered_method* crm;
1153   grpc_channel* channel;
1154   channel_data* chand;
1155   uint32_t hash;
1156   size_t slots;
1157   uint32_t probes;
1158   uint32_t max_probes = 0;
1159   grpc_transport_op* op = nullptr;
1160
1161   channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport,
1162                                 resource_user);
1163   chand = static_cast<channel_data*>(
1164       grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
1165           ->channel_data);
1166   chand->server = s;
1167   server_ref(s);
1168   chand->channel = channel;
1169   chand->socket_node = std::move(socket_node);
1170
1171   size_t cq_idx;
1172   for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
1173     if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
1174   }
1175   if (cq_idx == s->cq_count) {
1176     /* completion queue not found: pick a random one to publish new calls to */
1177     cq_idx = static_cast<size_t>(rand()) % s->cq_count;
1178   }
1179   chand->cq_idx = cq_idx;
1180
1181   num_registered_methods = 0;
1182   for (rm = s->registered_methods; rm; rm = rm->next) {
1183     num_registered_methods++;
1184   }
1185   /* build a lookup table phrased in terms of mdstr's in this channels context
1186      to quickly find registered methods */
1187   if (num_registered_methods > 0) {
1188     slots = 2 * num_registered_methods;
1189     alloc = sizeof(channel_registered_method) * slots;
1190     chand->registered_methods =
1191         static_cast<channel_registered_method*>(gpr_zalloc(alloc));
1192     for (rm = s->registered_methods; rm; rm = rm->next) {
1193       grpc_slice host;
1194       bool has_host;
1195       grpc_slice method;
1196       if (rm->host != nullptr) {
1197         host = grpc_slice_intern(grpc_slice_from_static_string(rm->host));
1198         has_host = true;
1199       } else {
1200         has_host = false;
1201       }
1202       method = grpc_slice_intern(grpc_slice_from_static_string(rm->method));
1203       hash = GRPC_MDSTR_KV_HASH(has_host ? grpc_slice_hash(host) : 0,
1204                                 grpc_slice_hash(method));
1205       for (probes = 0; chand->registered_methods[(hash + probes) % slots]
1206                            .server_registered_method != nullptr;
1207            probes++)
1208         ;
1209       if (probes > max_probes) max_probes = probes;
1210       crm = &chand->registered_methods[(hash + probes) % slots];
1211       crm->server_registered_method = rm;
1212       crm->flags = rm->flags;
1213       crm->has_host = has_host;
1214       if (has_host) {
1215         crm->host = host;
1216       }
1217       crm->method = method;
1218     }
1219     GPR_ASSERT(slots <= UINT32_MAX);
1220     chand->registered_method_slots = static_cast<uint32_t>(slots);
1221     chand->registered_method_max_probes = max_probes;
1222   }
1223
1224   gpr_mu_lock(&s->mu_global);
1225   chand->next = &s->root_channel_data;
1226   chand->prev = chand->next->prev;
1227   chand->next->prev = chand->prev->next = chand;
1228   gpr_mu_unlock(&s->mu_global);
1229
1230   GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
1231   op = grpc_make_transport_op(nullptr);
1232   op->set_accept_stream = true;
1233   op->set_accept_stream_fn = accept_stream;
1234   op->set_accept_stream_user_data = chand;
1235   op->on_connectivity_state_change = &chand->channel_connectivity_changed;
1236   op->connectivity_state = &chand->connectivity_state;
1237   if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
1238     op->disconnect_with_error =
1239         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");
1240   }
1241   grpc_transport_perform_op(transport, op);
1242 }
1243
1244 void grpc_server_populate_server_sockets(
1245     grpc_server* s, grpc_core::channelz::ChildSocketsList* server_sockets,
1246     intptr_t start_idx) {
1247   gpr_mu_lock(&s->mu_global);
1248   channel_data* c = nullptr;
1249   for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
1250     if (c->socket_node != nullptr && c->socket_node->uuid() >= start_idx) {
1251       server_sockets->push_back(c->socket_node.get());
1252     }
1253   }
1254   gpr_mu_unlock(&s->mu_global);
1255 }
1256
1257 void grpc_server_populate_listen_sockets(
1258     grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets) {
1259   gpr_mu_lock(&server->mu_global);
1260   for (listener* l = server->listeners; l != nullptr; l = l->next) {
1261     listen_sockets->push_back(l->socket_uuid);
1262   }
1263   gpr_mu_unlock(&server->mu_global);
1264 }
1265
1266 void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) {
1267   (void)done_arg;
1268   gpr_free(storage);
1269 }
1270
1271 static void listener_destroy_done(void* s, grpc_error* error) {
1272   grpc_server* server = static_cast<grpc_server*>(s);
1273   gpr_mu_lock(&server->mu_global);
1274   server->listeners_destroyed++;
1275   maybe_finish_shutdown(server);
1276   gpr_mu_unlock(&server->mu_global);
1277 }
1278
1279 /*
1280   - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via
1281     grpc_server_request_call and grpc_server_request_registered call will now be
1282     cancelled). See 'kill_pending_work_locked()'
1283
1284   - Shuts down the listeners (i.e the server will no longer listen on the port
1285     for new incoming channels).
1286
1287   - Iterates through all channels on the server and sends shutdown msg (see
1288     'channel_broadcaster_shutdown()' for details) to the clients via the
1289     transport layer. The transport layer then guarantees the following:
1290      -- Sends shutdown to the client (for eg: HTTP2 transport sends GOAWAY)
1291      -- If the server has outstanding calls that are in the process, the
1292         connection is NOT closed until the server is done with all those calls
1293      -- Once, there are no more calls in progress, the channel is closed
1294  */
1295 void grpc_server_shutdown_and_notify(grpc_server* server,
1296                                      grpc_completion_queue* cq, void* tag) {
1297   listener* l;
1298   shutdown_tag* sdt;
1299   channel_broadcaster broadcaster;
1300   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1301   grpc_core::ExecCtx exec_ctx;
1302
1303   GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1304                  (server, cq, tag));
1305
1306   /* wait for startup to be finished: locks mu_global */
1307   gpr_mu_lock(&server->mu_global);
1308   while (server->starting) {
1309     gpr_cv_wait(&server->starting_cv, &server->mu_global,
1310                 gpr_inf_future(GPR_CLOCK_MONOTONIC));
1311   }
1312
1313   /* stay locked, and gather up some stuff to do */
1314   GPR_ASSERT(grpc_cq_begin_op(cq, tag));
1315   if (server->shutdown_published) {
1316     grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, nullptr,
1317                    static_cast<grpc_cq_completion*>(
1318                        gpr_malloc(sizeof(grpc_cq_completion))));
1319     gpr_mu_unlock(&server->mu_global);
1320     return;
1321   }
1322   server->shutdown_tags = static_cast<shutdown_tag*>(
1323       gpr_realloc(server->shutdown_tags,
1324                   sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)));
1325   sdt = &server->shutdown_tags[server->num_shutdown_tags++];
1326   sdt->tag = tag;
1327   sdt->cq = cq;
1328   if (gpr_atm_acq_load(&server->shutdown_flag)) {
1329     gpr_mu_unlock(&server->mu_global);
1330     return;
1331   }
1332
1333   server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
1334
1335   channel_broadcaster_init(server, &broadcaster);
1336
1337   gpr_atm_rel_store(&server->shutdown_flag, 1);
1338
1339   /* collect all unregistered then registered calls */
1340   gpr_mu_lock(&server->mu_call);
1341   kill_pending_work_locked(
1342       server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1343   gpr_mu_unlock(&server->mu_call);
1344
1345   maybe_finish_shutdown(server);
1346   gpr_mu_unlock(&server->mu_global);
1347
1348   /* Shutdown listeners */
1349   for (l = server->listeners; l; l = l->next) {
1350     GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server,
1351                       grpc_schedule_on_exec_ctx);
1352     l->destroy(server, l->arg, &l->destroy_done);
1353   }
1354
1355   channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
1356                                GRPC_ERROR_NONE);
1357
1358   if (server->default_resource_user != nullptr) {
1359     grpc_resource_quota_unref(
1360         grpc_resource_user_quota(server->default_resource_user));
1361     grpc_resource_user_shutdown(server->default_resource_user);
1362     grpc_resource_user_unref(server->default_resource_user);
1363   }
1364 }
1365
1366 void grpc_server_cancel_all_calls(grpc_server* server) {
1367   channel_broadcaster broadcaster;
1368   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1369   grpc_core::ExecCtx exec_ctx;
1370
1371   GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1372
1373   gpr_mu_lock(&server->mu_global);
1374   channel_broadcaster_init(server, &broadcaster);
1375   gpr_mu_unlock(&server->mu_global);
1376
1377   channel_broadcaster_shutdown(
1378       &broadcaster, false /* send_goaway */,
1379       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));
1380 }
1381
1382 void grpc_server_destroy(grpc_server* server) {
1383   listener* l;
1384   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1385   grpc_core::ExecCtx exec_ctx;
1386
1387   GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1388
1389   gpr_mu_lock(&server->mu_global);
1390   GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
1391   GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
1392
1393   while (server->listeners) {
1394     l = server->listeners;
1395     server->listeners = l->next;
1396     gpr_free(l);
1397   }
1398
1399   gpr_mu_unlock(&server->mu_global);
1400
1401   server_unref(server);
1402 }
1403
1404 void grpc_server_add_listener(grpc_server* server, void* arg,
1405                               void (*start)(grpc_server* server, void* arg,
1406                                             grpc_pollset** pollsets,
1407                                             size_t pollset_count),
1408                               void (*destroy)(grpc_server* server, void* arg,
1409                                               grpc_closure* on_done),
1410                               intptr_t socket_uuid) {
1411   listener* l = static_cast<listener*>(gpr_malloc(sizeof(listener)));
1412   l->arg = arg;
1413   l->start = start;
1414   l->destroy = destroy;
1415   l->socket_uuid = socket_uuid;
1416   l->next = server->listeners;
1417   server->listeners = l;
1418 }
1419
1420 static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
1421                                           requested_call* rc) {
1422   call_data* calld = nullptr;
1423   request_matcher* rm = nullptr;
1424   if (gpr_atm_acq_load(&server->shutdown_flag)) {
1425     fail_call(server, cq_idx, rc,
1426               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1427     return GRPC_CALL_OK;
1428   }
1429   switch (rc->type) {
1430     case BATCH_CALL:
1431       rm = &server->unregistered_request_matcher;
1432       break;
1433     case REGISTERED_CALL:
1434       rm = &rc->data.registered.method->matcher;
1435       break;
1436   }
1437   if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
1438     /* this was the first queued request: we need to lock and start
1439        matching calls */
1440     gpr_mu_lock(&server->mu_call);
1441     while ((calld = rm->pending_head) != nullptr) {
1442       rc = reinterpret_cast<requested_call*>(
1443           gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
1444       if (rc == nullptr) break;
1445       rm->pending_head = calld->pending_next;
1446       gpr_mu_unlock(&server->mu_call);
1447       if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
1448         // Zombied Call
1449         GRPC_CLOSURE_INIT(
1450             &calld->kill_zombie_closure, kill_zombie,
1451             grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
1452             grpc_schedule_on_exec_ctx);
1453         GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
1454       } else {
1455         publish_call(server, calld, cq_idx, rc);
1456       }
1457       gpr_mu_lock(&server->mu_call);
1458     }
1459     gpr_mu_unlock(&server->mu_call);
1460   }
1461   return GRPC_CALL_OK;
1462 }
1463
1464 grpc_call_error grpc_server_request_call(
1465     grpc_server* server, grpc_call** call, grpc_call_details* details,
1466     grpc_metadata_array* initial_metadata,
1467     grpc_completion_queue* cq_bound_to_call,
1468     grpc_completion_queue* cq_for_notification, void* tag) {
1469   grpc_call_error error;
1470   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1471   grpc_core::ExecCtx exec_ctx;
1472   requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
1473   GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1474   GRPC_API_TRACE(
1475       "grpc_server_request_call("
1476       "server=%p, call=%p, details=%p, initial_metadata=%p, "
1477       "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
1478       7,
1479       (server, call, details, initial_metadata, cq_bound_to_call,
1480        cq_for_notification, tag));
1481   size_t cq_idx;
1482   for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
1483     if (server->cqs[cq_idx] == cq_for_notification) {
1484       break;
1485     }
1486   }
1487   if (cq_idx == server->cq_count) {
1488     gpr_free(rc);
1489     error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1490     goto done;
1491   }
1492   if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
1493     gpr_free(rc);
1494     error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1495     goto done;
1496   }
1497   details->reserved = nullptr;
1498   rc->cq_idx = cq_idx;
1499   rc->type = BATCH_CALL;
1500   rc->server = server;
1501   rc->tag = tag;
1502   rc->cq_bound_to_call = cq_bound_to_call;
1503   rc->call = call;
1504   rc->data.batch.details = details;
1505   rc->initial_metadata = initial_metadata;
1506   error = queue_call_request(server, cq_idx, rc);
1507 done:
1508
1509   return error;
1510 }
1511
1512 grpc_call_error grpc_server_request_registered_call(
1513     grpc_server* server, void* rmp, grpc_call** call, gpr_timespec* deadline,
1514     grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
1515     grpc_completion_queue* cq_bound_to_call,
1516     grpc_completion_queue* cq_for_notification, void* tag) {
1517   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1518   grpc_core::ExecCtx exec_ctx;
1519   GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1520   requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
1521   registered_method* rm = static_cast<registered_method*>(rmp);
1522   GRPC_API_TRACE(
1523       "grpc_server_request_registered_call("
1524       "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
1525       "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
1526       "tag=%p)",
1527       9,
1528       (server, rmp, call, deadline, initial_metadata, optional_payload,
1529        cq_bound_to_call, cq_for_notification, tag));
1530
1531   size_t cq_idx;
1532   for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
1533     if (server->cqs[cq_idx] == cq_for_notification) {
1534       break;
1535     }
1536   }
1537   if (cq_idx == server->cq_count) {
1538     gpr_free(rc);
1539     return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1540   }
1541   if ((optional_payload == nullptr) !=
1542       (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
1543     gpr_free(rc);
1544     return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
1545   }
1546
1547   if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
1548     gpr_free(rc);
1549     return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1550   }
1551   rc->cq_idx = cq_idx;
1552   rc->type = REGISTERED_CALL;
1553   rc->server = server;
1554   rc->tag = tag;
1555   rc->cq_bound_to_call = cq_bound_to_call;
1556   rc->call = call;
1557   rc->data.registered.method = rm;
1558   rc->data.registered.deadline = deadline;
1559   rc->initial_metadata = initial_metadata;
1560   rc->data.registered.optional_payload = optional_payload;
1561   return queue_call_request(server, cq_idx, rc);
1562 }
1563
1564 static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
1565                       grpc_error* error) {
1566   *rc->call = nullptr;
1567   rc->initial_metadata->count = 0;
1568   GPR_ASSERT(error != GRPC_ERROR_NONE);
1569
1570   grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,
1571                  &rc->completion);
1572 }
1573
1574 const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
1575   return server->channel_args;
1576 }
1577
1578 grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) {
1579   return server->default_resource_user;
1580 }
1581
1582 int grpc_server_has_open_connections(grpc_server* server) {
1583   int r;
1584   gpr_mu_lock(&server->mu_global);
1585   r = server->root_channel_data.next != &server->root_channel_data;
1586   gpr_mu_unlock(&server->mu_global);
1587   return r;
1588 }
1589
1590 grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
1591     grpc_server* server) {
1592   if (server == nullptr) {
1593     return nullptr;
1594   }
1595   return server->channelz_server.get();
1596 }