Imported Upstream version 1.37.0
[platform/upstream/grpc.git] / src / core / ext / transport / chttp2 / server / chttp2_server.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <string.h>
26 #include <vector>
27
28 #include "absl/strings/match.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31
32 #include <grpc/grpc.h>
33 #include <grpc/impl/codegen/grpc_types.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/sync.h>
37
38 #include "src/core/ext/filters/http/server/http_server_filter.h"
39 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
40 #include "src/core/ext/transport/chttp2/transport/internal.h"
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/channel/handshaker.h"
43 #include "src/core/lib/channel/handshaker_registry.h"
44 #include "src/core/lib/gprpp/ref_counted.h"
45 #include "src/core/lib/gprpp/ref_counted_ptr.h"
46 #include "src/core/lib/iomgr/endpoint.h"
47 #include "src/core/lib/iomgr/resolve_address.h"
48 #include "src/core/lib/iomgr/resource_quota.h"
49 #include "src/core/lib/iomgr/sockaddr_utils.h"
50 #include "src/core/lib/iomgr/tcp_server.h"
51 #include "src/core/lib/iomgr/unix_sockets_posix.h"
52 #include "src/core/lib/slice/slice_internal.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/server.h"
55
56 namespace grpc_core {
57 namespace {
58
59 const char kUnixUriPrefix[] = "unix:";
60 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
61
62 class Chttp2ServerListener : public Server::ListenerInterface {
63  public:
64   static grpc_error* Create(Server* server, grpc_resolved_address* addr,
65                             grpc_channel_args* args,
66                             Chttp2ServerArgsModifier args_modifier,
67                             int* port_num);
68
69   static grpc_error* CreateWithAcceptor(Server* server, const char* name,
70                                         grpc_channel_args* args,
71                                         Chttp2ServerArgsModifier args_modifier);
72
73   // Do not instantiate directly.  Use one of the factory methods above.
74   Chttp2ServerListener(Server* server, grpc_channel_args* args,
75                        Chttp2ServerArgsModifier args_modifier);
76   ~Chttp2ServerListener() override;
77
78   void Start(Server* server,
79              const std::vector<grpc_pollset*>* pollsets) override;
80
81   channelz::ListenSocketNode* channelz_listen_socket_node() const override {
82     return channelz_listen_socket_.get();
83   }
84
85   void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
86
87   void Orphan() override;
88
89  private:
90   class ConfigFetcherWatcher
91       : public grpc_server_config_fetcher::WatcherInterface {
92    public:
93     explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
94         : listener_(std::move(listener)) {}
95
96     void UpdateConnectionManager(
97         RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
98             connection_manager) override;
99
100     void StopServing() override;
101
102    private:
103     RefCountedPtr<Chttp2ServerListener> listener_;
104   };
105
106   class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
107    public:
108     class HandshakingState : public InternallyRefCounted<HandshakingState> {
109      public:
110       HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
111                        grpc_pollset* accepting_pollset,
112                        grpc_tcp_server_acceptor* acceptor,
113                        grpc_channel_args* args);
114
115       ~HandshakingState() override;
116
117       void Orphan() override;
118
119       void Start(grpc_endpoint* endpoint, grpc_channel_args* args);
120
121       // Needed to be able to grab an external ref in ActiveConnection::Start()
122       using InternallyRefCounted<HandshakingState>::Ref;
123
124      private:
125       static void OnTimeout(void* arg, grpc_error* error);
126       static void OnReceiveSettings(void* arg, grpc_error* /* error */);
127       static void OnHandshakeDone(void* arg, grpc_error* error);
128       RefCountedPtr<ActiveConnection> const connection_;
129       grpc_pollset* const accepting_pollset_;
130       grpc_tcp_server_acceptor* const acceptor_;
131       RefCountedPtr<HandshakeManager> handshake_mgr_
132           ABSL_GUARDED_BY(&connection_->mu_);
133       // State for enforcing handshake timeout on receiving HTTP/2 settings.
134       grpc_millis const deadline_;
135       grpc_timer timer_ ABSL_GUARDED_BY(&connection_->mu_);
136       grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_);
137       grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
138       grpc_pollset_set* const interested_parties_;
139     };
140
141     ActiveConnection(grpc_pollset* accepting_pollset,
142                      grpc_tcp_server_acceptor* acceptor,
143                      grpc_channel_args* args);
144     ~ActiveConnection() override;
145
146     void Orphan() override;
147
148     void SendGoAway();
149
150     void Start(RefCountedPtr<Chttp2ServerListener> listener,
151                grpc_endpoint* endpoint, grpc_channel_args* args);
152
153     // Needed to be able to grab an external ref in
154     // Chttp2ServerListener::OnAccept()
155     using InternallyRefCounted<ActiveConnection>::Ref;
156
157    private:
158     static void OnClose(void* arg, grpc_error* error);
159
160     RefCountedPtr<Chttp2ServerListener> listener_;
161     Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
162     // Set by HandshakingState before the handshaking begins and reset when
163     // handshaking is done.
164     OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
165     // Set by HandshakingState when handshaking is done and a valid transport is
166     // created.
167     grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
168     grpc_closure on_close_;
169     bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
170   };
171
172   // To allow access to RefCounted<> like interface.
173   friend class RefCountedPtr<Chttp2ServerListener>;
174
175   // Should only be called once so as to start the TCP server.
176   void StartListening();
177
178   static void OnAccept(void* arg, grpc_endpoint* tcp,
179                        grpc_pollset* accepting_pollset,
180                        grpc_tcp_server_acceptor* acceptor);
181
182   static void TcpServerShutdownComplete(void* arg, grpc_error* error);
183
184   static void DestroyListener(Server* /*server*/, void* arg,
185                               grpc_closure* destroy_done);
186
187   // The interface required by RefCountedPtr<> has been manually implemented
188   // here to take a ref on tcp_server_ instead. Note that, the handshaker needs
189   // tcp_server_ to exist for the lifetime of the handshake since it's needed by
190   // acceptor. Sharing refs between the listener and tcp_server_ is just an
191   // optimization to avoid taking additional refs on the listener, since
192   // TcpServerShutdownComplete already holds a ref to the listener.
193   void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
194   void IncrementRefCount(const DebugLocation& /* location */,
195                          const char* /* reason */) {
196     IncrementRefCount();
197   }
198
199   RefCountedPtr<Chttp2ServerListener> Ref() GRPC_MUST_USE_RESULT {
200     IncrementRefCount();
201     return RefCountedPtr<Chttp2ServerListener>(this);
202   }
203   RefCountedPtr<Chttp2ServerListener> Ref(const DebugLocation& /* location */,
204                                           const char* /* reason */)
205       GRPC_MUST_USE_RESULT {
206     return Ref();
207   }
208
209   void Unref() { grpc_tcp_server_unref(tcp_server_); }
210   void Unref(const DebugLocation& /* location */, const char* /* reason */) {
211     Unref();
212   }
213
214   Server* const server_;
215   grpc_tcp_server* tcp_server_;
216   grpc_resolved_address resolved_address_;
217   Chttp2ServerArgsModifier const args_modifier_;
218   ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
219   Mutex channel_args_mu_;
220   grpc_channel_args* args_ ABSL_GUARDED_BY(channel_args_mu_);
221   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
222       connection_manager_ ABSL_GUARDED_BY(channel_args_mu_);
223   Mutex mu_;
224   // Signals whether grpc_tcp_server_start() has been called.
225   bool started_ ABSL_GUARDED_BY(mu_) = false;
226   // Signals whether grpc_tcp_server_start() has completed.
227   CondVar started_cv_ ABSL_GUARDED_BY(mu_);
228   // Signals whether new requests/connections are to be accepted.
229   bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
230   // Signals whether the application has triggered shutdown.
231   bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
232   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
233       ABSL_GUARDED_BY(mu_);
234   grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
235   grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
236   RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
237 };
238
239 //
240 // Chttp2ServerListener::ConfigFetcherWatcher
241 //
242
243 void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
244     RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
245         connection_manager) {
246   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
247       connection_manager_to_destroy;
248   {
249     MutexLock lock(&listener_->channel_args_mu_);
250     connection_manager_to_destroy = listener_->connection_manager_;
251     listener_->connection_manager_ = std::move(connection_manager);
252   }
253   {
254     MutexLock lock(&listener_->mu_);
255     if (listener_->shutdown_) {
256       return;
257     }
258     listener_->is_serving_ = true;
259     if (listener_->started_) return;
260   }
261   int port_temp;
262   grpc_error* error = grpc_tcp_server_add_port(
263       listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
264   if (error != GRPC_ERROR_NONE) {
265     GRPC_ERROR_UNREF(error);
266     gpr_log(GPR_ERROR, "Error adding port to server: %s",
267             grpc_error_string(error));
268     // TODO(yashykt): We wouldn't need to assert here if we bound to the
269     // port earlier during AddPort.
270     GPR_ASSERT(0);
271   }
272   listener_->StartListening();
273   {
274     MutexLock lock(&listener_->mu_);
275     listener_->started_ = true;
276     listener_->started_cv_.SignalAll();
277   }
278 }
279
280 void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
281   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
282   {
283     MutexLock lock(&listener_->mu_);
284     listener_->is_serving_ = false;
285     connections = std::move(listener_->connections_);
286   }
287   // Send GOAWAYs on the transports so that they disconnected when existing RPCs
288   // finish.
289   for (auto& connection : connections) {
290     connection.first->SendGoAway();
291   }
292 }
293
294 //
295 // Chttp2ServerListener::ActiveConnection::HandshakingState
296 //
297
298 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
299   int timeout_ms =
300       grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
301                                      {120 * GPR_MS_PER_SEC, 1, INT_MAX});
302   return ExecCtx::Get()->Now() + timeout_ms;
303 }
304
305 Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
306     RefCountedPtr<ActiveConnection> connection_ref,
307     grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
308     grpc_channel_args* args)
309     : connection_(std::move(connection_ref)),
310       accepting_pollset_(accepting_pollset),
311       acceptor_(acceptor),
312       handshake_mgr_(MakeRefCounted<HandshakeManager>()),
313       deadline_(GetConnectionDeadline(args)),
314       interested_parties_(grpc_pollset_set_create()) {
315   grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
316   HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
317                                      interested_parties_, handshake_mgr_.get());
318 }
319
320 Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
321   grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
322   grpc_pollset_set_destroy(interested_parties_);
323 }
324
325 void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
326   {
327     MutexLock lock(&connection_->mu_);
328     if (handshake_mgr_ != nullptr) {
329       handshake_mgr_->Shutdown(
330           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Listener stopped serving."));
331     }
332   }
333   Unref();
334 }
335
336 void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
337     grpc_endpoint* endpoint, grpc_channel_args* args) {
338   Ref().release();  // Held by OnHandshakeDone
339   RefCountedPtr<HandshakeManager> handshake_mgr;
340   {
341     MutexLock lock(&connection_->mu_);
342     if (handshake_mgr_ == nullptr) return;
343     handshake_mgr = handshake_mgr_;
344   }
345   handshake_mgr->DoHandshake(endpoint, args, deadline_, acceptor_,
346                              OnHandshakeDone, this);
347 }
348
349 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout(
350     void* arg, grpc_error* error) {
351   HandshakingState* self = static_cast<HandshakingState*>(arg);
352   // Note that we may be called with GRPC_ERROR_NONE when the timer fires
353   // or with an error indicating that the timer system is being shut down.
354   if (error != GRPC_ERROR_CANCELLED) {
355     grpc_transport_op* op = grpc_make_transport_op(nullptr);
356     op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
357         "Did not receive HTTP/2 settings before handshake timeout");
358     grpc_chttp2_transport* transport = nullptr;
359     {
360       MutexLock lock(&self->connection_->mu_);
361       transport = self->connection_->transport_;
362     }
363     grpc_transport_perform_op(&transport->base, op);
364   }
365   self->Unref();
366 }
367
368 void Chttp2ServerListener::ActiveConnection::HandshakingState::
369     OnReceiveSettings(void* arg, grpc_error* /* error */) {
370   HandshakingState* self = static_cast<HandshakingState*>(arg);
371   grpc_timer_cancel(&self->timer_);
372   self->Unref();
373 }
374
375 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
376     void* arg, grpc_error* error) {
377   auto* args = static_cast<HandshakerArgs*>(arg);
378   HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
379   OrphanablePtr<HandshakingState> handshaking_state_ref;
380   RefCountedPtr<HandshakeManager> handshake_mgr;
381   bool cleanup_connection = false;
382   bool free_resource_quota = false;
383   grpc_resource_user* resource_user =
384       self->connection_->listener_->server_->default_resource_user();
385   {
386     MutexLock connection_lock(&self->connection_->mu_);
387     if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) {
388       const char* error_str = grpc_error_string(error);
389       gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
390       cleanup_connection = true;
391       free_resource_quota = true;
392       if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
393         // We were shut down or stopped serving after handshaking completed
394         // successfully, so destroy the endpoint here.
395         // TODO(ctiller): It is currently necessary to shutdown endpoints
396         // before destroying them, even if we know that there are no
397         // pending read/write callbacks.  This should be fixed, at which
398         // point this can be removed.
399         grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
400         grpc_endpoint_destroy(args->endpoint);
401         grpc_channel_args_destroy(args->args);
402         grpc_slice_buffer_destroy_internal(args->read_buffer);
403         gpr_free(args->read_buffer);
404       }
405     } else {
406       // If the handshaking succeeded but there is no endpoint, then the
407       // handshaker may have handed off the connection to some external
408       // code, so we can just clean up here without creating a transport.
409       if (args->endpoint != nullptr) {
410         grpc_transport* transport = grpc_create_chttp2_transport(
411             args->args, args->endpoint, false, resource_user);
412         grpc_error* channel_init_err =
413             self->connection_->listener_->server_->SetupTransport(
414                 transport, self->accepting_pollset_, args->args,
415                 grpc_chttp2_transport_get_socket_node(transport),
416                 resource_user);
417         if (channel_init_err == GRPC_ERROR_NONE) {
418           // Use notify_on_receive_settings callback to enforce the
419           // handshake deadline.
420           // Note: The reinterpret_cast<>s here are safe, because
421           // grpc_chttp2_transport is a C-style extension of
422           // grpc_transport, so this is morally equivalent of a
423           // static_cast<> to a derived class.
424           // TODO(roth): Change to static_cast<> when we C++-ify the
425           // transport API.
426           self->connection_->transport_ =
427               reinterpret_cast<grpc_chttp2_transport*>(transport);
428           GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
429                                     "ActiveConnection");  // Held by connection_
430           self->Ref().release();  // Held by OnReceiveSettings().
431           GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
432                             self, grpc_schedule_on_exec_ctx);
433           // If the listener has been configured with a config fetcher, we need
434           // to watch on the transport being closed so that we can an updated
435           // list of active connections.
436           grpc_closure* on_close = nullptr;
437           if (self->connection_->listener_->config_fetcher_watcher_ !=
438               nullptr) {
439             // Refs helds by OnClose()
440             self->connection_->Ref().release();
441             on_close = &self->connection_->on_close_;
442           } else {
443             // Remove the connection from the connections_ map since OnClose()
444             // will not be invoked when a config fetcher is set.
445             cleanup_connection = true;
446           }
447           grpc_chttp2_transport_start_reading(transport, args->read_buffer,
448                                               &self->on_receive_settings_,
449                                               on_close);
450           grpc_channel_args_destroy(args->args);
451           self->Ref().release();  // Held by OnTimeout().
452           GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
453                             grpc_schedule_on_exec_ctx);
454           grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
455         } else {
456           // Failed to create channel from transport. Clean up.
457           gpr_log(GPR_ERROR, "Failed to create channel: %s",
458                   grpc_error_string(channel_init_err));
459           GRPC_ERROR_UNREF(channel_init_err);
460           grpc_transport_destroy(transport);
461           grpc_slice_buffer_destroy_internal(args->read_buffer);
462           gpr_free(args->read_buffer);
463           cleanup_connection = true;
464           free_resource_quota = true;
465           grpc_channel_args_destroy(args->args);
466         }
467       } else {
468         cleanup_connection = true;
469         free_resource_quota = true;
470       }
471     }
472     // Since the handshake manager is done, the connection no longer needs to
473     // shutdown the handshake when the listener needs to stop serving.
474     // Avoid calling the destructor of HandshakeManager and HandshakingState
475     // from within the critical region.
476     handshake_mgr = std::move(self->handshake_mgr_);
477     handshaking_state_ref = std::move(self->connection_->handshaking_state_);
478   }
479   gpr_free(self->acceptor_);
480   OrphanablePtr<ActiveConnection> connection;
481   if (free_resource_quota && resource_user != nullptr) {
482     grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
483   }
484   if (cleanup_connection) {
485     MutexLock listener_lock(&self->connection_->listener_->mu_);
486     auto it = self->connection_->listener_->connections_.find(
487         self->connection_.get());
488     if (it != self->connection_->listener_->connections_.end()) {
489       connection = std::move(it->second);
490       self->connection_->listener_->connections_.erase(it);
491     }
492   }
493   self->Unref();
494 }
495
496 //
497 // Chttp2ServerListener::ActiveConnection
498 //
499
500 Chttp2ServerListener::ActiveConnection::ActiveConnection(
501     grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
502     grpc_channel_args* args)
503     : handshaking_state_(MakeOrphanable<HandshakingState>(
504           Ref(), accepting_pollset, acceptor, args)) {
505   GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
506                     grpc_schedule_on_exec_ctx);
507 }
508
509 Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
510   if (transport_ != nullptr) {
511     GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
512   }
513 }
514
515 void Chttp2ServerListener::ActiveConnection::Orphan() {
516   OrphanablePtr<HandshakingState> handshaking_state;
517   {
518     MutexLock lock(&mu_);
519     shutdown_ = true;
520     // Reset handshaking_state_ since we have been orphaned by the listener
521     // signaling that the listener has stopped serving.
522     handshaking_state = std::move(handshaking_state_);
523   }
524   Unref();
525 }
526
527 void Chttp2ServerListener::ActiveConnection::SendGoAway() {
528   grpc_chttp2_transport* transport = nullptr;
529   {
530     MutexLock lock(&mu_);
531     transport = transport_;
532   }
533   if (transport != nullptr) {
534     grpc_transport_op* op = grpc_make_transport_op(nullptr);
535     op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
536         "Server is stopping to serve requests.");
537     grpc_transport_perform_op(&transport->base, op);
538   }
539 }
540
541 void Chttp2ServerListener::ActiveConnection::Start(
542     RefCountedPtr<Chttp2ServerListener> listener, grpc_endpoint* endpoint,
543     grpc_channel_args* args) {
544   RefCountedPtr<HandshakingState> handshaking_state_ref;
545   listener_ = std::move(listener);
546   {
547     MutexLock lock(&mu_);
548     if (shutdown_) return;
549     // Hold a ref to HandshakingState to allow starting the handshake outside
550     // the critical region.
551     handshaking_state_ref = handshaking_state_->Ref();
552   }
553   handshaking_state_ref->Start(endpoint, args);
554 }
555
556 void Chttp2ServerListener::ActiveConnection::OnClose(void* arg,
557                                                      grpc_error* /* error */) {
558   ActiveConnection* self = static_cast<ActiveConnection*>(arg);
559   OrphanablePtr<ActiveConnection> connection;
560   {
561     MutexLock listener_lock(&self->listener_->mu_);
562     MutexLock connection_lock(&self->mu_);
563     // The node was already deleted from the connections_ list if the connection
564     // is shutdown.
565     if (!self->shutdown_) {
566       auto it = self->listener_->connections_.find(self);
567       if (it != self->listener_->connections_.end()) {
568         connection = std::move(it->second);
569         self->listener_->connections_.erase(it);
570       }
571     }
572   }
573   self->Unref();
574 }
575
576 //
577 // Chttp2ServerListener
578 //
579
580 grpc_error* Chttp2ServerListener::Create(Server* server,
581                                          grpc_resolved_address* addr,
582                                          grpc_channel_args* args,
583                                          Chttp2ServerArgsModifier args_modifier,
584                                          int* port_num) {
585   Chttp2ServerListener* listener = nullptr;
586   // The bulk of this method is inside of a lambda to make cleanup
587   // easier without using goto.
588   grpc_error* error = [&]() {
589     // Create Chttp2ServerListener.
590     listener = new Chttp2ServerListener(server, args, args_modifier);
591     error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
592                                    args, &listener->tcp_server_);
593     if (error != GRPC_ERROR_NONE) return error;
594     if (server->config_fetcher() != nullptr) {
595       listener->resolved_address_ = *addr;
596       // TODO(yashykt): Consider binding so as to be able to return the port
597       // number.
598     } else {
599       error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
600       if (error != GRPC_ERROR_NONE) return error;
601     }
602     // Create channelz node.
603     if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
604                                     GRPC_ENABLE_CHANNELZ_DEFAULT)) {
605       std::string string_address = grpc_sockaddr_to_uri(addr);
606       listener->channelz_listen_socket_ =
607           MakeRefCounted<channelz::ListenSocketNode>(
608               string_address.c_str(),
609               absl::StrFormat("chttp2 listener %s", string_address.c_str()));
610     }
611     // Register with the server only upon success
612     server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
613     return GRPC_ERROR_NONE;
614   }();
615   if (error != GRPC_ERROR_NONE) {
616     if (listener != nullptr) {
617       if (listener->tcp_server_ != nullptr) {
618         // listener is deleted when tcp_server_ is shutdown.
619         grpc_tcp_server_unref(listener->tcp_server_);
620       } else {
621         delete listener;
622       }
623     } else {
624       grpc_channel_args_destroy(args);
625     }
626   }
627   return error;
628 }
629
630 grpc_error* Chttp2ServerListener::CreateWithAcceptor(
631     Server* server, const char* name, grpc_channel_args* args,
632     Chttp2ServerArgsModifier args_modifier) {
633   Chttp2ServerListener* listener =
634       new Chttp2ServerListener(server, args, args_modifier);
635   grpc_error* error = grpc_tcp_server_create(
636       &listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_);
637   if (error != GRPC_ERROR_NONE) {
638     delete listener;
639     return error;
640   }
641   // TODO(yangg) channelz
642   TcpServerFdHandler** arg_val =
643       grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
644   *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
645   server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
646   return GRPC_ERROR_NONE;
647 }
648
649 Chttp2ServerListener::Chttp2ServerListener(
650     Server* server, grpc_channel_args* args,
651     Chttp2ServerArgsModifier args_modifier)
652     : server_(server), args_modifier_(args_modifier), args_(args) {
653   GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
654                     this, grpc_schedule_on_exec_ctx);
655 }
656
657 Chttp2ServerListener::~Chttp2ServerListener() {
658   // Flush queued work before destroying handshaker factory, since that
659   // may do a synchronous unref.
660   ExecCtx::Get()->Flush();
661   if (on_destroy_done_ != nullptr) {
662     ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE);
663     ExecCtx::Get()->Flush();
664   }
665   grpc_channel_args_destroy(args_);
666 }
667
668 /* Server callback: start listening on our ports */
669 void Chttp2ServerListener::Start(
670     Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
671   if (server_->config_fetcher() != nullptr) {
672     grpc_channel_args* args = nullptr;
673     auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref());
674     config_fetcher_watcher_ = watcher.get();
675     {
676       MutexLock lock(&channel_args_mu_);
677       args = grpc_channel_args_copy(args_);
678     }
679     server_->config_fetcher()->StartWatch(
680         grpc_sockaddr_to_string(&resolved_address_, false), args,
681         std::move(watcher));
682   } else {
683     {
684       MutexLock lock(&mu_);
685       started_ = true;
686       is_serving_ = true;
687     }
688     StartListening();
689   }
690 }
691
692 void Chttp2ServerListener::StartListening() {
693   grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this);
694 }
695
696 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
697   MutexLock lock(&mu_);
698   on_destroy_done_ = on_destroy_done;
699 }
700
701 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
702                                     grpc_pollset* accepting_pollset,
703                                     grpc_tcp_server_acceptor* acceptor) {
704   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
705   grpc_channel_args* args = nullptr;
706   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
707       connection_manager;
708   {
709     MutexLock lock(&self->channel_args_mu_);
710     args = grpc_channel_args_copy(self->args_);
711     connection_manager = self->connection_manager_;
712   }
713   auto endpoint_cleanup = [&](grpc_error* error) {
714     grpc_endpoint_shutdown(tcp, error);
715     grpc_endpoint_destroy(tcp);
716     gpr_free(acceptor);
717   };
718   if (self->server_->config_fetcher() != nullptr) {
719     if (connection_manager == nullptr) {
720       grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
721           "No ConnectionManager configured. Closing connection.");
722       endpoint_cleanup(error);
723       grpc_channel_args_destroy(args);
724       return;
725     }
726     // TODO(yashykt): Maybe combine the following two arg modifiers into a
727     // single one.
728     absl::StatusOr<grpc_channel_args*> args_result =
729         connection_manager->UpdateChannelArgsForConnection(args, tcp);
730     if (!args_result.ok()) {
731       gpr_log(GPR_DEBUG, "Closing connection: %s",
732               args_result.status().ToString().c_str());
733       endpoint_cleanup(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
734           args_result.status().ToString().c_str()));
735       return;
736     }
737     grpc_error* error = GRPC_ERROR_NONE;
738     args = self->args_modifier_(*args_result, &error);
739     if (error != GRPC_ERROR_NONE) {
740       gpr_log(GPR_DEBUG, "Closing connection: %s", grpc_error_string(error));
741       endpoint_cleanup(error);
742       grpc_channel_args_destroy(args);
743       return;
744     }
745   }
746   auto connection =
747       MakeOrphanable<ActiveConnection>(accepting_pollset, acceptor, args);
748   // Hold a ref to connection to allow starting handshake outside the
749   // critical region
750   RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
751   RefCountedPtr<Chttp2ServerListener> listener_ref;
752   {
753     MutexLock lock(&self->mu_);
754     // Shutdown the the connection if listener's stopped serving.
755     if (!self->shutdown_ && self->is_serving_) {
756       grpc_resource_user* resource_user =
757           self->server_->default_resource_user();
758       if (resource_user != nullptr &&
759           !grpc_resource_user_safe_alloc(resource_user,
760                                          GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
761         gpr_log(
762             GPR_ERROR,
763             "Memory quota exhausted, rejecting connection, no handshaking.");
764       } else {
765         // This ref needs to be taken in the critical region after having made
766         // sure that the listener has not been Orphaned, so as to avoid
767         // heap-use-after-free issues where `Ref()` is invoked when the ref of
768         // tcp_server_ has already reached 0. (Ref() implementation of
769         // Chttp2ServerListener is grpc_tcp_server_ref().)
770         listener_ref = self->Ref();
771         self->connections_.emplace(connection.get(), std::move(connection));
772       }
773     }
774   }
775   if (connection != nullptr) {
776     endpoint_cleanup(GRPC_ERROR_NONE);
777   } else {
778     connection_ref->Start(std::move(listener_ref), tcp, args);
779   }
780   grpc_channel_args_destroy(args);
781 }
782
783 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
784                                                      grpc_error* error) {
785   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
786   self->channelz_listen_socket_.reset();
787   GRPC_ERROR_UNREF(error);
788   delete self;
789 }
790
791 /* Server callback: destroy the tcp listener (so we don't generate further
792    callbacks) */
793 void Chttp2ServerListener::Orphan() {
794   // Cancel the watch before shutting down so as to avoid holding a ref to the
795   // listener in the watcher.
796   if (config_fetcher_watcher_ != nullptr) {
797     server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
798   }
799   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
800   grpc_tcp_server* tcp_server;
801   {
802     MutexLock lock(&mu_);
803     shutdown_ = true;
804     is_serving_ = false;
805     // Orphan the connections so that they can start cleaning up.
806     connections = std::move(connections_);
807     // If the listener is currently set to be serving but has not been started
808     // yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
809     // operation to finish to avoid causing races.
810     while (is_serving_ && !started_) {
811       started_cv_.Wait(&mu_);
812     }
813     tcp_server = tcp_server_;
814   }
815   grpc_tcp_server_shutdown_listeners(tcp_server);
816   grpc_tcp_server_unref(tcp_server);
817 }
818
819 }  // namespace
820
821 //
822 // Chttp2ServerAddPort()
823 //
824
825 grpc_error* Chttp2ServerAddPort(Server* server, const char* addr,
826                                 grpc_channel_args* args,
827                                 Chttp2ServerArgsModifier args_modifier,
828                                 int* port_num) {
829   if (strncmp(addr, "external:", 9) == 0) {
830     return grpc_core::Chttp2ServerListener::CreateWithAcceptor(
831         server, addr, args, args_modifier);
832   }
833   *port_num = -1;
834   grpc_resolved_addresses* resolved = nullptr;
835   std::vector<grpc_error*> error_list;
836   // Using lambda to avoid use of goto.
837   grpc_error* error = [&]() {
838     if (absl::StartsWith(addr, kUnixUriPrefix)) {
839       error = grpc_resolve_unix_domain_address(
840           addr + sizeof(kUnixUriPrefix) - 1, &resolved);
841     } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
842       error = grpc_resolve_unix_abstract_domain_address(
843           addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
844     } else {
845       error = grpc_blocking_resolve_address(addr, "https", &resolved);
846     }
847     if (error != GRPC_ERROR_NONE) return error;
848     // Create a listener for each resolved address.
849     for (size_t i = 0; i < resolved->naddrs; i++) {
850       // If address has a wildcard port (0), use the same port as a previous
851       // listener.
852       if (*port_num != -1 && grpc_sockaddr_get_port(&resolved->addrs[i]) == 0) {
853         grpc_sockaddr_set_port(&resolved->addrs[i], *port_num);
854       }
855       int port_temp = -1;
856       error = grpc_core::Chttp2ServerListener::Create(
857           server, &resolved->addrs[i], grpc_channel_args_copy(args),
858           args_modifier, &port_temp);
859       if (error != GRPC_ERROR_NONE) {
860         error_list.push_back(error);
861       } else {
862         if (*port_num == -1) {
863           *port_num = port_temp;
864         } else {
865           GPR_ASSERT(*port_num == port_temp);
866         }
867       }
868     }
869     if (error_list.size() == resolved->naddrs) {
870       std::string msg =
871           absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
872                           resolved->naddrs);
873       return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
874           msg.c_str(), error_list.data(), error_list.size());
875     } else if (!error_list.empty()) {
876       std::string msg = absl::StrFormat(
877           "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
878           " resolved",
879           resolved->naddrs - error_list.size(), resolved->naddrs);
880       error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
881           msg.c_str(), error_list.data(), error_list.size());
882       gpr_log(GPR_INFO, "WARNING: %s", grpc_error_string(error));
883       GRPC_ERROR_UNREF(error);
884       // we managed to bind some addresses: continue without error
885     }
886     return GRPC_ERROR_NONE;
887   }();  // lambda end
888   for (grpc_error* error : error_list) {
889     GRPC_ERROR_UNREF(error);
890   }
891   grpc_channel_args_destroy(args);
892   if (resolved != nullptr) {
893     grpc_resolved_addresses_destroy(resolved);
894   }
895   if (error != GRPC_ERROR_NONE) *port_num = 0;
896   return error;
897 }
898
899 }  // namespace grpc_core