f37c3777c517cdbd3a6fd2b2d2ea0a58a9c0a0d7
[platform/upstream/grpc.git] / src / core / ext / transport / chttp2 / server / chttp2_server.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <string.h>
26 #include <vector>
27
28 #include "absl/strings/match.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31
32 #include <grpc/grpc.h>
33 #include <grpc/impl/codegen/grpc_types.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/sync.h>
37
38 #include "src/core/ext/filters/http/server/http_server_filter.h"
39 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
40 #include "src/core/ext/transport/chttp2/transport/internal.h"
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/handshaker.h"
44 #include "src/core/lib/channel/handshaker_registry.h"
45 #include "src/core/lib/gprpp/ref_counted.h"
46 #include "src/core/lib/gprpp/ref_counted_ptr.h"
47 #include "src/core/lib/iomgr/endpoint.h"
48 #include "src/core/lib/iomgr/resolve_address.h"
49 #include "src/core/lib/iomgr/resource_quota.h"
50 #include "src/core/lib/iomgr/tcp_server.h"
51 #include "src/core/lib/iomgr/unix_sockets_posix.h"
52 #include "src/core/lib/slice/slice_internal.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/server.h"
55
56 namespace grpc_core {
57 namespace {
58
59 const char kUnixUriPrefix[] = "unix:";
60 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
61
62 class Chttp2ServerListener : public Server::ListenerInterface {
63  public:
64   static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
65                                   grpc_channel_args* args,
66                                   Chttp2ServerArgsModifier args_modifier,
67                                   int* port_num);
68
69   static grpc_error_handle CreateWithAcceptor(
70       Server* server, const char* name, grpc_channel_args* args,
71       Chttp2ServerArgsModifier args_modifier);
72
73   // Do not instantiate directly.  Use one of the factory methods above.
74   Chttp2ServerListener(Server* server, grpc_channel_args* args,
75                        Chttp2ServerArgsModifier args_modifier);
76   ~Chttp2ServerListener() override;
77
78   void Start(Server* server,
79              const std::vector<grpc_pollset*>* pollsets) override;
80
81   channelz::ListenSocketNode* channelz_listen_socket_node() const override {
82     return channelz_listen_socket_.get();
83   }
84
85   void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
86
87   void Orphan() override;
88
89  private:
90   class ConfigFetcherWatcher
91       : public grpc_server_config_fetcher::WatcherInterface {
92    public:
93     explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
94         : listener_(std::move(listener)) {}
95
96     void UpdateConnectionManager(
97         RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
98             connection_manager) override;
99
100     void StopServing() override;
101
102    private:
103     RefCountedPtr<Chttp2ServerListener> listener_;
104   };
105
106   class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
107    public:
108     class HandshakingState : public InternallyRefCounted<HandshakingState> {
109      public:
110       HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
111                        grpc_pollset* accepting_pollset,
112                        grpc_tcp_server_acceptor* acceptor,
113                        grpc_channel_args* args);
114
115       ~HandshakingState() override;
116
117       void Orphan() override;
118
119       void Start(grpc_endpoint* endpoint, grpc_channel_args* args);
120
121       // Needed to be able to grab an external ref in ActiveConnection::Start()
122       using InternallyRefCounted<HandshakingState>::Ref;
123
124      private:
125       static void OnTimeout(void* arg, grpc_error_handle error);
126       static void OnReceiveSettings(void* arg, grpc_error_handle /* error */);
127       static void OnHandshakeDone(void* arg, grpc_error_handle error);
128       RefCountedPtr<ActiveConnection> const connection_;
129       grpc_pollset* const accepting_pollset_;
130       grpc_tcp_server_acceptor* acceptor_;
131       RefCountedPtr<HandshakeManager> handshake_mgr_
132           ABSL_GUARDED_BY(&connection_->mu_);
133       // State for enforcing handshake timeout on receiving HTTP/2 settings.
134       grpc_millis const deadline_;
135       grpc_timer timer_ ABSL_GUARDED_BY(&connection_->mu_);
136       grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_);
137       grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
138       grpc_pollset_set* const interested_parties_;
139     };
140
141     ActiveConnection(grpc_pollset* accepting_pollset,
142                      grpc_tcp_server_acceptor* acceptor,
143                      grpc_channel_args* args);
144     ~ActiveConnection() override;
145
146     void Orphan() override;
147
148     void SendGoAway();
149
150     void Start(RefCountedPtr<Chttp2ServerListener> listener,
151                grpc_endpoint* endpoint, grpc_channel_args* args);
152
153     // Needed to be able to grab an external ref in
154     // Chttp2ServerListener::OnAccept()
155     using InternallyRefCounted<ActiveConnection>::Ref;
156
157    private:
158     static void OnClose(void* arg, grpc_error_handle error);
159
160     RefCountedPtr<Chttp2ServerListener> listener_;
161     Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
162     // Set by HandshakingState before the handshaking begins and reset when
163     // handshaking is done.
164     OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
165     // Set by HandshakingState when handshaking is done and a valid transport is
166     // created.
167     grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
168     grpc_closure on_close_;
169     bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
170   };
171
172   // To allow access to RefCounted<> like interface.
173   friend class RefCountedPtr<Chttp2ServerListener>;
174
175   // Should only be called once so as to start the TCP server.
176   void StartListening();
177
178   static void OnAccept(void* arg, grpc_endpoint* tcp,
179                        grpc_pollset* accepting_pollset,
180                        grpc_tcp_server_acceptor* acceptor);
181
182   static void TcpServerShutdownComplete(void* arg, grpc_error_handle error);
183
184   static void DestroyListener(Server* /*server*/, void* arg,
185                               grpc_closure* destroy_done);
186
187   // The interface required by RefCountedPtr<> has been manually implemented
188   // here to take a ref on tcp_server_ instead. Note that, the handshaker needs
189   // tcp_server_ to exist for the lifetime of the handshake since it's needed by
190   // acceptor. Sharing refs between the listener and tcp_server_ is just an
191   // optimization to avoid taking additional refs on the listener, since
192   // TcpServerShutdownComplete already holds a ref to the listener.
193   void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
194   void IncrementRefCount(const DebugLocation& /* location */,
195                          const char* /* reason */) {
196     IncrementRefCount();
197   }
198
199   RefCountedPtr<Chttp2ServerListener> Ref() GRPC_MUST_USE_RESULT {
200     IncrementRefCount();
201     return RefCountedPtr<Chttp2ServerListener>(this);
202   }
203   RefCountedPtr<Chttp2ServerListener> Ref(const DebugLocation& /* location */,
204                                           const char* /* reason */)
205       GRPC_MUST_USE_RESULT {
206     return Ref();
207   }
208
209   void Unref() { grpc_tcp_server_unref(tcp_server_); }
210   void Unref(const DebugLocation& /* location */, const char* /* reason */) {
211     Unref();
212   }
213
214   Server* const server_;
215   grpc_tcp_server* tcp_server_;
216   grpc_resolved_address resolved_address_;
217   Chttp2ServerArgsModifier const args_modifier_;
218   ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
219   Mutex channel_args_mu_;
220   grpc_channel_args* args_ ABSL_GUARDED_BY(channel_args_mu_);
221   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
222       connection_manager_ ABSL_GUARDED_BY(channel_args_mu_);
223   Mutex mu_;
224   // Signals whether grpc_tcp_server_start() has been called.
225   bool started_ ABSL_GUARDED_BY(mu_) = false;
226   // Signals whether grpc_tcp_server_start() has completed.
227   CondVar started_cv_ ABSL_GUARDED_BY(mu_);
228   // Signals whether new requests/connections are to be accepted.
229   bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
230   // Signals whether the application has triggered shutdown.
231   bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
232   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
233       ABSL_GUARDED_BY(mu_);
234   grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
235   grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
236   RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
237 };
238
239 //
240 // Chttp2ServerListener::ConfigFetcherWatcher
241 //
242
243 void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
244     RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
245         connection_manager) {
246   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
247       connection_manager_to_destroy;
248   {
249     MutexLock lock(&listener_->channel_args_mu_);
250     connection_manager_to_destroy = listener_->connection_manager_;
251     listener_->connection_manager_ = std::move(connection_manager);
252   }
253   {
254     MutexLock lock(&listener_->mu_);
255     if (listener_->shutdown_) {
256       return;
257     }
258     listener_->is_serving_ = true;
259     if (listener_->started_) return;
260   }
261   int port_temp;
262   grpc_error_handle error = grpc_tcp_server_add_port(
263       listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
264   if (error != GRPC_ERROR_NONE) {
265     GRPC_ERROR_UNREF(error);
266     gpr_log(GPR_ERROR, "Error adding port to server: %s",
267             grpc_error_std_string(error).c_str());
268     // TODO(yashykt): We wouldn't need to assert here if we bound to the
269     // port earlier during AddPort.
270     GPR_ASSERT(0);
271   }
272   listener_->StartListening();
273   {
274     MutexLock lock(&listener_->mu_);
275     listener_->started_ = true;
276     listener_->started_cv_.SignalAll();
277   }
278 }
279
280 void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
281   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
282   {
283     MutexLock lock(&listener_->mu_);
284     listener_->is_serving_ = false;
285     connections = std::move(listener_->connections_);
286   }
287   // Send GOAWAYs on the transports so that they disconnected when existing RPCs
288   // finish.
289   for (auto& connection : connections) {
290     connection.first->SendGoAway();
291   }
292 }
293
294 //
295 // Chttp2ServerListener::ActiveConnection::HandshakingState
296 //
297
298 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
299   int timeout_ms =
300       grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
301                                      {120 * GPR_MS_PER_SEC, 1, INT_MAX});
302   return ExecCtx::Get()->Now() + timeout_ms;
303 }
304
305 Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
306     RefCountedPtr<ActiveConnection> connection_ref,
307     grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
308     grpc_channel_args* args)
309     : connection_(std::move(connection_ref)),
310       accepting_pollset_(accepting_pollset),
311       acceptor_(acceptor),
312       handshake_mgr_(MakeRefCounted<HandshakeManager>()),
313       deadline_(GetConnectionDeadline(args)),
314       interested_parties_(grpc_pollset_set_create()) {
315   grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
316   HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
317                                      interested_parties_, handshake_mgr_.get());
318 }
319
320 Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
321   grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
322   grpc_pollset_set_destroy(interested_parties_);
323   gpr_free(acceptor_);
324 }
325
326 void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
327   {
328     MutexLock lock(&connection_->mu_);
329     if (handshake_mgr_ != nullptr) {
330       handshake_mgr_->Shutdown(
331           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Listener stopped serving."));
332     }
333   }
334   Unref();
335 }
336
337 void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
338     grpc_endpoint* endpoint, grpc_channel_args* args) {
339   Ref().release();  // Held by OnHandshakeDone
340   RefCountedPtr<HandshakeManager> handshake_mgr;
341   {
342     MutexLock lock(&connection_->mu_);
343     if (handshake_mgr_ == nullptr) return;
344     handshake_mgr = handshake_mgr_;
345   }
346   handshake_mgr->DoHandshake(endpoint, args, deadline_, acceptor_,
347                              OnHandshakeDone, this);
348 }
349
350 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout(
351     void* arg, grpc_error_handle error) {
352   HandshakingState* self = static_cast<HandshakingState*>(arg);
353   // Note that we may be called with GRPC_ERROR_NONE when the timer fires
354   // or with an error indicating that the timer system is being shut down.
355   if (error != GRPC_ERROR_CANCELLED) {
356     grpc_transport_op* op = grpc_make_transport_op(nullptr);
357     op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
358         "Did not receive HTTP/2 settings before handshake timeout");
359     grpc_chttp2_transport* transport = nullptr;
360     {
361       MutexLock lock(&self->connection_->mu_);
362       transport = self->connection_->transport_;
363     }
364     grpc_transport_perform_op(&transport->base, op);
365   }
366   self->Unref();
367 }
368
369 void Chttp2ServerListener::ActiveConnection::HandshakingState::
370     OnReceiveSettings(void* arg, grpc_error_handle /* error */) {
371   HandshakingState* self = static_cast<HandshakingState*>(arg);
372   grpc_timer_cancel(&self->timer_);
373   self->Unref();
374 }
375
376 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
377     void* arg, grpc_error_handle error) {
378   auto* args = static_cast<HandshakerArgs*>(arg);
379   HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
380   OrphanablePtr<HandshakingState> handshaking_state_ref;
381   RefCountedPtr<HandshakeManager> handshake_mgr;
382   bool cleanup_connection = false;
383   bool free_resource_quota = false;
384   grpc_resource_user* resource_user =
385       self->connection_->listener_->server_->default_resource_user();
386   {
387     MutexLock connection_lock(&self->connection_->mu_);
388     if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) {
389       std::string error_str = grpc_error_std_string(error);
390       gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str());
391       cleanup_connection = true;
392       free_resource_quota = true;
393       if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
394         // We were shut down or stopped serving after handshaking completed
395         // successfully, so destroy the endpoint here.
396         // TODO(ctiller): It is currently necessary to shutdown endpoints
397         // before destroying them, even if we know that there are no
398         // pending read/write callbacks.  This should be fixed, at which
399         // point this can be removed.
400         grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
401         grpc_endpoint_destroy(args->endpoint);
402         grpc_channel_args_destroy(args->args);
403         grpc_slice_buffer_destroy_internal(args->read_buffer);
404         gpr_free(args->read_buffer);
405       }
406     } else {
407       // If the handshaking succeeded but there is no endpoint, then the
408       // handshaker may have handed off the connection to some external
409       // code, so we can just clean up here without creating a transport.
410       if (args->endpoint != nullptr) {
411         grpc_transport* transport = grpc_create_chttp2_transport(
412             args->args, args->endpoint, false, resource_user);
413         grpc_error_handle channel_init_err =
414             self->connection_->listener_->server_->SetupTransport(
415                 transport, self->accepting_pollset_, args->args,
416                 grpc_chttp2_transport_get_socket_node(transport),
417                 resource_user);
418         if (channel_init_err == GRPC_ERROR_NONE) {
419           // Use notify_on_receive_settings callback to enforce the
420           // handshake deadline.
421           // Note: The reinterpret_cast<>s here are safe, because
422           // grpc_chttp2_transport is a C-style extension of
423           // grpc_transport, so this is morally equivalent of a
424           // static_cast<> to a derived class.
425           // TODO(roth): Change to static_cast<> when we C++-ify the
426           // transport API.
427           self->connection_->transport_ =
428               reinterpret_cast<grpc_chttp2_transport*>(transport);
429           GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
430                                     "ActiveConnection");  // Held by connection_
431           self->Ref().release();  // Held by OnReceiveSettings().
432           GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
433                             self, grpc_schedule_on_exec_ctx);
434           // If the listener has been configured with a config fetcher, we need
435           // to watch on the transport being closed so that we can an updated
436           // list of active connections.
437           grpc_closure* on_close = nullptr;
438           if (self->connection_->listener_->config_fetcher_watcher_ !=
439               nullptr) {
440             // Refs helds by OnClose()
441             self->connection_->Ref().release();
442             on_close = &self->connection_->on_close_;
443           } else {
444             // Remove the connection from the connections_ map since OnClose()
445             // will not be invoked when a config fetcher is set.
446             cleanup_connection = true;
447           }
448           grpc_chttp2_transport_start_reading(transport, args->read_buffer,
449                                               &self->on_receive_settings_,
450                                               on_close);
451           grpc_channel_args_destroy(args->args);
452           self->Ref().release();  // Held by OnTimeout().
453           GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
454                             grpc_schedule_on_exec_ctx);
455           grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
456         } else {
457           // Failed to create channel from transport. Clean up.
458           gpr_log(GPR_ERROR, "Failed to create channel: %s",
459                   grpc_error_std_string(channel_init_err).c_str());
460           GRPC_ERROR_UNREF(channel_init_err);
461           grpc_transport_destroy(transport);
462           grpc_slice_buffer_destroy_internal(args->read_buffer);
463           gpr_free(args->read_buffer);
464           cleanup_connection = true;
465           free_resource_quota = true;
466           grpc_channel_args_destroy(args->args);
467         }
468       } else {
469         cleanup_connection = true;
470         free_resource_quota = true;
471       }
472     }
473     // Since the handshake manager is done, the connection no longer needs to
474     // shutdown the handshake when the listener needs to stop serving.
475     // Avoid calling the destructor of HandshakeManager and HandshakingState
476     // from within the critical region.
477     handshake_mgr = std::move(self->handshake_mgr_);
478     handshaking_state_ref = std::move(self->connection_->handshaking_state_);
479   }
480   gpr_free(self->acceptor_);
481   self->acceptor_ = nullptr;
482   OrphanablePtr<ActiveConnection> connection;
483   if (free_resource_quota && resource_user != nullptr) {
484     grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
485   }
486   if (cleanup_connection) {
487     MutexLock listener_lock(&self->connection_->listener_->mu_);
488     auto it = self->connection_->listener_->connections_.find(
489         self->connection_.get());
490     if (it != self->connection_->listener_->connections_.end()) {
491       connection = std::move(it->second);
492       self->connection_->listener_->connections_.erase(it);
493     }
494   }
495   self->Unref();
496 }
497
498 //
499 // Chttp2ServerListener::ActiveConnection
500 //
501
502 Chttp2ServerListener::ActiveConnection::ActiveConnection(
503     grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
504     grpc_channel_args* args)
505     : handshaking_state_(MakeOrphanable<HandshakingState>(
506           Ref(), accepting_pollset, acceptor, args)) {
507   GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
508                     grpc_schedule_on_exec_ctx);
509 }
510
511 Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
512   if (transport_ != nullptr) {
513     GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
514   }
515 }
516
517 void Chttp2ServerListener::ActiveConnection::Orphan() {
518   OrphanablePtr<HandshakingState> handshaking_state;
519   {
520     MutexLock lock(&mu_);
521     shutdown_ = true;
522     // Reset handshaking_state_ since we have been orphaned by the listener
523     // signaling that the listener has stopped serving.
524     handshaking_state = std::move(handshaking_state_);
525   }
526   Unref();
527 }
528
529 void Chttp2ServerListener::ActiveConnection::SendGoAway() {
530   grpc_chttp2_transport* transport = nullptr;
531   {
532     MutexLock lock(&mu_);
533     transport = transport_;
534   }
535   if (transport != nullptr) {
536     grpc_transport_op* op = grpc_make_transport_op(nullptr);
537     op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
538         "Server is stopping to serve requests.");
539     grpc_transport_perform_op(&transport->base, op);
540   }
541 }
542
543 void Chttp2ServerListener::ActiveConnection::Start(
544     RefCountedPtr<Chttp2ServerListener> listener, grpc_endpoint* endpoint,
545     grpc_channel_args* args) {
546   RefCountedPtr<HandshakingState> handshaking_state_ref;
547   listener_ = std::move(listener);
548   {
549     MutexLock lock(&mu_);
550     if (shutdown_) return;
551     // Hold a ref to HandshakingState to allow starting the handshake outside
552     // the critical region.
553     handshaking_state_ref = handshaking_state_->Ref();
554   }
555   handshaking_state_ref->Start(endpoint, args);
556 }
557
558 void Chttp2ServerListener::ActiveConnection::OnClose(
559     void* arg, grpc_error_handle /* error */) {
560   ActiveConnection* self = static_cast<ActiveConnection*>(arg);
561   OrphanablePtr<ActiveConnection> connection;
562   {
563     MutexLock listener_lock(&self->listener_->mu_);
564     MutexLock connection_lock(&self->mu_);
565     // The node was already deleted from the connections_ list if the connection
566     // is shutdown.
567     if (!self->shutdown_) {
568       auto it = self->listener_->connections_.find(self);
569       if (it != self->listener_->connections_.end()) {
570         connection = std::move(it->second);
571         self->listener_->connections_.erase(it);
572       }
573     }
574   }
575   self->Unref();
576 }
577
578 //
579 // Chttp2ServerListener
580 //
581
582 grpc_error_handle Chttp2ServerListener::Create(
583     Server* server, grpc_resolved_address* addr, grpc_channel_args* args,
584     Chttp2ServerArgsModifier args_modifier, int* port_num) {
585   Chttp2ServerListener* listener = nullptr;
586   // The bulk of this method is inside of a lambda to make cleanup
587   // easier without using goto.
588   grpc_error_handle error = [&]() {
589     // Create Chttp2ServerListener.
590     listener = new Chttp2ServerListener(server, args, args_modifier);
591     error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
592                                    args, &listener->tcp_server_);
593     if (error != GRPC_ERROR_NONE) return error;
594     if (server->config_fetcher() != nullptr) {
595       listener->resolved_address_ = *addr;
596       // TODO(yashykt): Consider binding so as to be able to return the port
597       // number.
598     } else {
599       error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
600       if (error != GRPC_ERROR_NONE) return error;
601     }
602     // Create channelz node.
603     if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
604                                     GRPC_ENABLE_CHANNELZ_DEFAULT)) {
605       std::string string_address = grpc_sockaddr_to_uri(addr);
606       listener->channelz_listen_socket_ =
607           MakeRefCounted<channelz::ListenSocketNode>(
608               string_address.c_str(),
609               absl::StrFormat("chttp2 listener %s", string_address.c_str()));
610     }
611     // Register with the server only upon success
612     server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
613     return GRPC_ERROR_NONE;
614   }();
615   if (error != GRPC_ERROR_NONE) {
616     if (listener != nullptr) {
617       if (listener->tcp_server_ != nullptr) {
618         // listener is deleted when tcp_server_ is shutdown.
619         grpc_tcp_server_unref(listener->tcp_server_);
620       } else {
621         delete listener;
622       }
623     } else {
624       grpc_channel_args_destroy(args);
625     }
626   }
627   return error;
628 }
629
630 grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
631     Server* server, const char* name, grpc_channel_args* args,
632     Chttp2ServerArgsModifier args_modifier) {
633   Chttp2ServerListener* listener =
634       new Chttp2ServerListener(server, args, args_modifier);
635   grpc_error_handle error = grpc_tcp_server_create(
636       &listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_);
637   if (error != GRPC_ERROR_NONE) {
638     delete listener;
639     return error;
640   }
641   // TODO(yangg) channelz
642   TcpServerFdHandler** arg_val =
643       grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
644   *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
645   server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
646   return GRPC_ERROR_NONE;
647 }
648
649 Chttp2ServerListener::Chttp2ServerListener(
650     Server* server, grpc_channel_args* args,
651     Chttp2ServerArgsModifier args_modifier)
652     : server_(server), args_modifier_(args_modifier), args_(args) {
653   GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
654                     this, grpc_schedule_on_exec_ctx);
655 }
656
657 Chttp2ServerListener::~Chttp2ServerListener() {
658   // Flush queued work before destroying handshaker factory, since that
659   // may do a synchronous unref.
660   ExecCtx::Get()->Flush();
661   if (on_destroy_done_ != nullptr) {
662     ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE);
663     ExecCtx::Get()->Flush();
664   }
665   grpc_channel_args_destroy(args_);
666 }
667
668 /* Server callback: start listening on our ports */
669 void Chttp2ServerListener::Start(
670     Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
671   if (server_->config_fetcher() != nullptr) {
672     grpc_channel_args* args = nullptr;
673     auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref());
674     config_fetcher_watcher_ = watcher.get();
675     {
676       MutexLock lock(&channel_args_mu_);
677       args = grpc_channel_args_copy(args_);
678     }
679     server_->config_fetcher()->StartWatch(
680         grpc_sockaddr_to_string(&resolved_address_, false), args,
681         std::move(watcher));
682   } else {
683     {
684       MutexLock lock(&mu_);
685       started_ = true;
686       is_serving_ = true;
687     }
688     StartListening();
689   }
690 }
691
692 void Chttp2ServerListener::StartListening() {
693   grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this);
694 }
695
696 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
697   MutexLock lock(&mu_);
698   on_destroy_done_ = on_destroy_done;
699 }
700
701 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
702                                     grpc_pollset* accepting_pollset,
703                                     grpc_tcp_server_acceptor* acceptor) {
704   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
705   grpc_channel_args* args = nullptr;
706   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
707       connection_manager;
708   {
709     MutexLock lock(&self->channel_args_mu_);
710     args = grpc_channel_args_copy(self->args_);
711     connection_manager = self->connection_manager_;
712   }
713   auto endpoint_cleanup = [&](grpc_error_handle error) {
714     grpc_endpoint_shutdown(tcp, error);
715     grpc_endpoint_destroy(tcp);
716     gpr_free(acceptor);
717   };
718   if (self->server_->config_fetcher() != nullptr) {
719     if (connection_manager == nullptr) {
720       grpc_error_handle error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
721           "No ConnectionManager configured. Closing connection.");
722       endpoint_cleanup(error);
723       grpc_channel_args_destroy(args);
724       return;
725     }
726     // TODO(yashykt): Maybe combine the following two arg modifiers into a
727     // single one.
728     absl::StatusOr<grpc_channel_args*> args_result =
729         connection_manager->UpdateChannelArgsForConnection(args, tcp);
730     if (!args_result.ok()) {
731       gpr_log(GPR_DEBUG, "Closing connection: %s",
732               args_result.status().ToString().c_str());
733       endpoint_cleanup(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
734           args_result.status().ToString().c_str()));
735       return;
736     }
737     grpc_error_handle error = GRPC_ERROR_NONE;
738     args = self->args_modifier_(*args_result, &error);
739     if (error != GRPC_ERROR_NONE) {
740       gpr_log(GPR_DEBUG, "Closing connection: %s",
741               grpc_error_std_string(error).c_str());
742       endpoint_cleanup(error);
743       grpc_channel_args_destroy(args);
744       return;
745     }
746   }
747   auto connection =
748       MakeOrphanable<ActiveConnection>(accepting_pollset, acceptor, args);
749   // We no longer own acceptor
750   acceptor = nullptr;
751   // Hold a ref to connection to allow starting handshake outside the
752   // critical region
753   RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
754   RefCountedPtr<Chttp2ServerListener> listener_ref;
755   {
756     MutexLock lock(&self->mu_);
757     // Shutdown the the connection if listener's stopped serving.
758     if (!self->shutdown_ && self->is_serving_) {
759       grpc_resource_user* resource_user =
760           self->server_->default_resource_user();
761       if (resource_user != nullptr &&
762           !grpc_resource_user_safe_alloc(resource_user,
763                                          GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
764         gpr_log(
765             GPR_ERROR,
766             "Memory quota exhausted, rejecting connection, no handshaking.");
767       } else {
768         // This ref needs to be taken in the critical region after having made
769         // sure that the listener has not been Orphaned, so as to avoid
770         // heap-use-after-free issues where `Ref()` is invoked when the ref of
771         // tcp_server_ has already reached 0. (Ref() implementation of
772         // Chttp2ServerListener is grpc_tcp_server_ref().)
773         listener_ref = self->Ref();
774         self->connections_.emplace(connection.get(), std::move(connection));
775       }
776     }
777   }
778   if (connection != nullptr) {
779     endpoint_cleanup(GRPC_ERROR_NONE);
780   } else {
781     connection_ref->Start(std::move(listener_ref), tcp, args);
782   }
783   grpc_channel_args_destroy(args);
784 }
785
786 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
787                                                      grpc_error_handle error) {
788   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
789   self->channelz_listen_socket_.reset();
790   GRPC_ERROR_UNREF(error);
791   delete self;
792 }
793
794 /* Server callback: destroy the tcp listener (so we don't generate further
795    callbacks) */
796 void Chttp2ServerListener::Orphan() {
797   // Cancel the watch before shutting down so as to avoid holding a ref to the
798   // listener in the watcher.
799   if (config_fetcher_watcher_ != nullptr) {
800     server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
801   }
802   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
803   grpc_tcp_server* tcp_server;
804   {
805     MutexLock lock(&mu_);
806     shutdown_ = true;
807     is_serving_ = false;
808     // Orphan the connections so that they can start cleaning up.
809     connections = std::move(connections_);
810     // If the listener is currently set to be serving but has not been started
811     // yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
812     // operation to finish to avoid causing races.
813     while (is_serving_ && !started_) {
814       started_cv_.Wait(&mu_);
815     }
816     tcp_server = tcp_server_;
817   }
818   grpc_tcp_server_shutdown_listeners(tcp_server);
819   grpc_tcp_server_unref(tcp_server);
820 }
821
822 }  // namespace
823
824 //
825 // Chttp2ServerAddPort()
826 //
827
828 grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
829                                       grpc_channel_args* args,
830                                       Chttp2ServerArgsModifier args_modifier,
831                                       int* port_num) {
832   if (strncmp(addr, "external:", 9) == 0) {
833     return grpc_core::Chttp2ServerListener::CreateWithAcceptor(
834         server, addr, args, args_modifier);
835   }
836   *port_num = -1;
837   grpc_resolved_addresses* resolved = nullptr;
838   std::vector<grpc_error_handle> error_list;
839   // Using lambda to avoid use of goto.
840   grpc_error_handle error = [&]() {
841     if (absl::StartsWith(addr, kUnixUriPrefix)) {
842       error = grpc_resolve_unix_domain_address(
843           addr + sizeof(kUnixUriPrefix) - 1, &resolved);
844     } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
845       error = grpc_resolve_unix_abstract_domain_address(
846           addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
847     } else {
848       error = grpc_blocking_resolve_address(addr, "https", &resolved);
849     }
850     if (error != GRPC_ERROR_NONE) return error;
851     // Create a listener for each resolved address.
852     for (size_t i = 0; i < resolved->naddrs; i++) {
853       // If address has a wildcard port (0), use the same port as a previous
854       // listener.
855       if (*port_num != -1 && grpc_sockaddr_get_port(&resolved->addrs[i]) == 0) {
856         grpc_sockaddr_set_port(&resolved->addrs[i], *port_num);
857       }
858       int port_temp = -1;
859       error = grpc_core::Chttp2ServerListener::Create(
860           server, &resolved->addrs[i], grpc_channel_args_copy(args),
861           args_modifier, &port_temp);
862       if (error != GRPC_ERROR_NONE) {
863         error_list.push_back(error);
864       } else {
865         if (*port_num == -1) {
866           *port_num = port_temp;
867         } else {
868           GPR_ASSERT(*port_num == port_temp);
869         }
870       }
871     }
872     if (error_list.size() == resolved->naddrs) {
873       std::string msg =
874           absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
875                           resolved->naddrs);
876       return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
877           msg.c_str(), error_list.data(), error_list.size());
878     } else if (!error_list.empty()) {
879       std::string msg = absl::StrFormat(
880           "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
881           " resolved",
882           resolved->naddrs - error_list.size(), resolved->naddrs);
883       error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
884           msg.c_str(), error_list.data(), error_list.size());
885       gpr_log(GPR_INFO, "WARNING: %s", grpc_error_std_string(error).c_str());
886       GRPC_ERROR_UNREF(error);
887       // we managed to bind some addresses: continue without error
888     }
889     return GRPC_ERROR_NONE;
890   }();  // lambda end
891   for (grpc_error_handle error : error_list) {
892     GRPC_ERROR_UNREF(error);
893   }
894   grpc_channel_args_destroy(args);
895   if (resolved != nullptr) {
896     grpc_resolved_addresses_destroy(resolved);
897   }
898   if (error != GRPC_ERROR_NONE) *port_num = 0;
899   return error;
900 }
901
902 }  // namespace grpc_core