Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / src / core / ext / transport / chttp2 / server / chttp2_server.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <string.h>
26
27 #include <vector>
28
29 #include "absl/strings/match.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_format.h"
32
33 #include <grpc/grpc.h>
34 #include <grpc/impl/codegen/grpc_types.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/sync.h>
38
39 #include "src/core/ext/filters/http/server/http_server_filter.h"
40 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
41 #include "src/core/ext/transport/chttp2/transport/internal.h"
42 #include "src/core/lib/address_utils/sockaddr_utils.h"
43 #include "src/core/lib/channel/channel_args.h"
44 #include "src/core/lib/channel/handshaker.h"
45 #include "src/core/lib/config/core_configuration.h"
46 #include "src/core/lib/gprpp/ref_counted.h"
47 #include "src/core/lib/gprpp/ref_counted_ptr.h"
48 #include "src/core/lib/iomgr/endpoint.h"
49 #include "src/core/lib/iomgr/resolve_address.h"
50 #include "src/core/lib/iomgr/resource_quota.h"
51 #include "src/core/lib/iomgr/tcp_server.h"
52 #include "src/core/lib/iomgr/unix_sockets_posix.h"
53 #include "src/core/lib/slice/slice_internal.h"
54 #include "src/core/lib/surface/api_trace.h"
55 #include "src/core/lib/surface/server.h"
56
57 namespace grpc_core {
58 namespace {
59
60 const char kUnixUriPrefix[] = "unix:";
61 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
62
63 class Chttp2ServerListener : public Server::ListenerInterface {
64  public:
65   static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
66                                   grpc_channel_args* args,
67                                   Chttp2ServerArgsModifier args_modifier,
68                                   int* port_num);
69
70   static grpc_error_handle CreateWithAcceptor(
71       Server* server, const char* name, grpc_channel_args* args,
72       Chttp2ServerArgsModifier args_modifier);
73
74   // Do not instantiate directly.  Use one of the factory methods above.
75   Chttp2ServerListener(Server* server, grpc_channel_args* args,
76                        Chttp2ServerArgsModifier args_modifier,
77                        grpc_resource_quota* resource_quota);
78   ~Chttp2ServerListener() override;
79
80   void Start(Server* server,
81              const std::vector<grpc_pollset*>* pollsets) override;
82
83   channelz::ListenSocketNode* channelz_listen_socket_node() const override {
84     return channelz_listen_socket_.get();
85   }
86
87   void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
88
89   void Orphan() override;
90
91  private:
92   class ConfigFetcherWatcher
93       : public grpc_server_config_fetcher::WatcherInterface {
94    public:
95     explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
96         : listener_(std::move(listener)) {}
97
98     void UpdateConnectionManager(
99         RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
100             connection_manager) override;
101
102     void StopServing() override;
103
104    private:
105     RefCountedPtr<Chttp2ServerListener> listener_;
106   };
107
108   class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
109    public:
110     class HandshakingState : public InternallyRefCounted<HandshakingState> {
111      public:
112       HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
113                        grpc_pollset* accepting_pollset,
114                        grpc_tcp_server_acceptor* acceptor,
115                        grpc_channel_args* args,
116                        grpc_resource_user* channel_resource_user);
117
118       ~HandshakingState() override;
119
120       void Orphan() override;
121
122       void Start(grpc_endpoint* endpoint, grpc_channel_args* args);
123
124       // Needed to be able to grab an external ref in ActiveConnection::Start()
125       using InternallyRefCounted<HandshakingState>::Ref;
126
127      private:
128       static void OnTimeout(void* arg, grpc_error_handle error);
129       static void OnReceiveSettings(void* arg, grpc_error_handle /* error */);
130       static void OnHandshakeDone(void* arg, grpc_error_handle error);
131       RefCountedPtr<ActiveConnection> const connection_;
132       grpc_pollset* const accepting_pollset_;
133       grpc_tcp_server_acceptor* acceptor_;
134       RefCountedPtr<HandshakeManager> handshake_mgr_
135           ABSL_GUARDED_BY(&connection_->mu_);
136       // State for enforcing handshake timeout on receiving HTTP/2 settings.
137       grpc_millis const deadline_;
138       grpc_timer timer_ ABSL_GUARDED_BY(&connection_->mu_);
139       grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_);
140       grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
141       grpc_pollset_set* const interested_parties_;
142       grpc_resource_user* channel_resource_user_;
143     };
144
145     ActiveConnection(grpc_pollset* accepting_pollset,
146                      grpc_tcp_server_acceptor* acceptor,
147                      grpc_channel_args* args,
148                      grpc_resource_user* channel_resource_user);
149     ~ActiveConnection() override;
150
151     void Orphan() override;
152
153     void SendGoAway();
154
155     void Start(RefCountedPtr<Chttp2ServerListener> listener,
156                grpc_endpoint* endpoint, grpc_channel_args* args);
157
158     // Needed to be able to grab an external ref in
159     // Chttp2ServerListener::OnAccept()
160     using InternallyRefCounted<ActiveConnection>::Ref;
161
162    private:
163     static void OnClose(void* arg, grpc_error_handle error);
164
165     RefCountedPtr<Chttp2ServerListener> listener_;
166     Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
167     // Set by HandshakingState before the handshaking begins and reset when
168     // handshaking is done.
169     OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
170     // Set by HandshakingState when handshaking is done and a valid transport is
171     // created.
172     grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
173     grpc_closure on_close_;
174     bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
175   };
176
177   // To allow access to RefCounted<> like interface.
178   friend class RefCountedPtr<Chttp2ServerListener>;
179
180   // Should only be called once so as to start the TCP server.
181   void StartListening();
182
183   static void OnAccept(void* arg, grpc_endpoint* tcp,
184                        grpc_pollset* accepting_pollset,
185                        grpc_tcp_server_acceptor* acceptor);
186
187   static void TcpServerShutdownComplete(void* arg, grpc_error_handle error);
188
189   static void DestroyListener(Server* /*server*/, void* arg,
190                               grpc_closure* destroy_done);
191
192   // The interface required by RefCountedPtr<> has been manually implemented
193   // here to take a ref on tcp_server_ instead. Note that, the handshaker needs
194   // tcp_server_ to exist for the lifetime of the handshake since it's needed by
195   // acceptor. Sharing refs between the listener and tcp_server_ is just an
196   // optimization to avoid taking additional refs on the listener, since
197   // TcpServerShutdownComplete already holds a ref to the listener.
198   void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
199   void IncrementRefCount(const DebugLocation& /* location */,
200                          const char* /* reason */) {
201     IncrementRefCount();
202   }
203
204   RefCountedPtr<Chttp2ServerListener> Ref() GRPC_MUST_USE_RESULT {
205     IncrementRefCount();
206     return RefCountedPtr<Chttp2ServerListener>(this);
207   }
208   RefCountedPtr<Chttp2ServerListener> Ref(const DebugLocation& /* location */,
209                                           const char* /* reason */)
210       GRPC_MUST_USE_RESULT {
211     return Ref();
212   }
213
214   void Unref() { grpc_tcp_server_unref(tcp_server_); }
215   void Unref(const DebugLocation& /* location */, const char* /* reason */) {
216     Unref();
217   }
218
219   Server* const server_;
220   grpc_tcp_server* tcp_server_;
221   grpc_resolved_address resolved_address_;
222   Chttp2ServerArgsModifier const args_modifier_;
223   ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
224   Mutex channel_args_mu_;
225   grpc_channel_args* args_ ABSL_GUARDED_BY(channel_args_mu_);
226   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
227       connection_manager_ ABSL_GUARDED_BY(channel_args_mu_);
228   Mutex mu_;
229   // Signals whether grpc_tcp_server_start() has been called.
230   bool started_ ABSL_GUARDED_BY(mu_) = false;
231   // Signals whether grpc_tcp_server_start() has completed.
232   CondVar started_cv_ ABSL_GUARDED_BY(mu_);
233   // Signals whether new requests/connections are to be accepted.
234   bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
235   // Signals whether the application has triggered shutdown.
236   bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
237   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
238       ABSL_GUARDED_BY(mu_);
239   grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
240   grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
241   RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
242   grpc_resource_quota* resource_quota_;
243 };
244
245 //
246 // Chttp2ServerListener::ConfigFetcherWatcher
247 //
248
249 void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
250     RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
251         connection_manager) {
252   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
253       connection_manager_to_destroy;
254   {
255     MutexLock lock(&listener_->channel_args_mu_);
256     connection_manager_to_destroy = listener_->connection_manager_;
257     listener_->connection_manager_ = std::move(connection_manager);
258   }
259   {
260     MutexLock lock(&listener_->mu_);
261     if (listener_->shutdown_) {
262       return;
263     }
264     listener_->is_serving_ = true;
265     if (listener_->started_) return;
266   }
267   int port_temp;
268   grpc_error_handle error = grpc_tcp_server_add_port(
269       listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
270   if (error != GRPC_ERROR_NONE) {
271     GRPC_ERROR_UNREF(error);
272     gpr_log(GPR_ERROR, "Error adding port to server: %s",
273             grpc_error_std_string(error).c_str());
274     // TODO(yashykt): We wouldn't need to assert here if we bound to the
275     // port earlier during AddPort.
276     GPR_ASSERT(0);
277   }
278   listener_->StartListening();
279   {
280     MutexLock lock(&listener_->mu_);
281     listener_->started_ = true;
282     listener_->started_cv_.SignalAll();
283   }
284 }
285
286 void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
287   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
288   {
289     MutexLock lock(&listener_->mu_);
290     listener_->is_serving_ = false;
291     connections = std::move(listener_->connections_);
292   }
293   // Send GOAWAYs on the transports so that they disconnected when existing RPCs
294   // finish.
295   for (auto& connection : connections) {
296     connection.first->SendGoAway();
297   }
298 }
299
300 //
301 // Chttp2ServerListener::ActiveConnection::HandshakingState
302 //
303
304 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
305   int timeout_ms =
306       grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
307                                      {120 * GPR_MS_PER_SEC, 1, INT_MAX});
308   return ExecCtx::Get()->Now() + timeout_ms;
309 }
310
311 Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
312     RefCountedPtr<ActiveConnection> connection_ref,
313     grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
314     grpc_channel_args* args, grpc_resource_user* channel_resource_user)
315     : connection_(std::move(connection_ref)),
316       accepting_pollset_(accepting_pollset),
317       acceptor_(acceptor),
318       handshake_mgr_(MakeRefCounted<HandshakeManager>()),
319       deadline_(GetConnectionDeadline(args)),
320       interested_parties_(grpc_pollset_set_create()),
321       channel_resource_user_(channel_resource_user) {
322   grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
323   CoreConfiguration::Get().handshaker_registry().AddHandshakers(
324       HANDSHAKER_SERVER, args, interested_parties_, handshake_mgr_.get());
325 }
326
327 Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
328   grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
329   grpc_pollset_set_destroy(interested_parties_);
330   if (channel_resource_user_ != nullptr) {
331     grpc_resource_user_unref(channel_resource_user_);
332   }
333   gpr_free(acceptor_);
334 }
335
336 void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
337   {
338     MutexLock lock(&connection_->mu_);
339     if (handshake_mgr_ != nullptr) {
340       handshake_mgr_->Shutdown(
341           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Listener stopped serving."));
342     }
343   }
344   Unref();
345 }
346
347 void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
348     grpc_endpoint* endpoint, grpc_channel_args* args) {
349   Ref().release();  // Held by OnHandshakeDone
350   RefCountedPtr<HandshakeManager> handshake_mgr;
351   {
352     MutexLock lock(&connection_->mu_);
353     if (handshake_mgr_ == nullptr) return;
354     handshake_mgr = handshake_mgr_;
355   }
356   handshake_mgr->DoHandshake(endpoint, args, deadline_, acceptor_,
357                              OnHandshakeDone, this);
358 }
359
360 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout(
361     void* arg, grpc_error_handle error) {
362   HandshakingState* self = static_cast<HandshakingState*>(arg);
363   // Note that we may be called with GRPC_ERROR_NONE when the timer fires
364   // or with an error indicating that the timer system is being shut down.
365   if (error != GRPC_ERROR_CANCELLED) {
366     grpc_transport_op* op = grpc_make_transport_op(nullptr);
367     op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
368         "Did not receive HTTP/2 settings before handshake timeout");
369     grpc_chttp2_transport* transport = nullptr;
370     {
371       MutexLock lock(&self->connection_->mu_);
372       transport = self->connection_->transport_;
373     }
374     grpc_transport_perform_op(&transport->base, op);
375   }
376   self->Unref();
377 }
378
379 void Chttp2ServerListener::ActiveConnection::HandshakingState::
380     OnReceiveSettings(void* arg, grpc_error_handle /* error */) {
381   HandshakingState* self = static_cast<HandshakingState*>(arg);
382   grpc_timer_cancel(&self->timer_);
383   self->Unref();
384 }
385
386 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
387     void* arg, grpc_error_handle error) {
388   auto* args = static_cast<HandshakerArgs*>(arg);
389   HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
390   OrphanablePtr<HandshakingState> handshaking_state_ref;
391   RefCountedPtr<HandshakeManager> handshake_mgr;
392   bool cleanup_connection = false;
393   {
394     MutexLock connection_lock(&self->connection_->mu_);
395     if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) {
396       std::string error_str = grpc_error_std_string(error);
397       gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str());
398       cleanup_connection = true;
399       if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
400         // We were shut down or stopped serving after handshaking completed
401         // successfully, so destroy the endpoint here.
402         // TODO(ctiller): It is currently necessary to shutdown endpoints
403         // before destroying them, even if we know that there are no
404         // pending read/write callbacks.  This should be fixed, at which
405         // point this can be removed.
406         grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
407         grpc_endpoint_destroy(args->endpoint);
408         grpc_channel_args_destroy(args->args);
409         grpc_slice_buffer_destroy_internal(args->read_buffer);
410         gpr_free(args->read_buffer);
411       }
412     } else {
413       // If the handshaking succeeded but there is no endpoint, then the
414       // handshaker may have handed off the connection to some external
415       // code, so we can just clean up here without creating a transport.
416       if (args->endpoint != nullptr) {
417         grpc_transport* transport = grpc_create_chttp2_transport(
418             args->args, args->endpoint, false,
419             grpc_resource_user_create(
420                 self->connection_->listener_->resource_quota_,
421                 absl::StrCat(grpc_endpoint_get_peer(args->endpoint),
422                              ":chttp2_server_transport")));
423         grpc_error_handle channel_init_err =
424             self->connection_->listener_->server_->SetupTransport(
425                 transport, self->accepting_pollset_, args->args,
426                 grpc_chttp2_transport_get_socket_node(transport),
427                 self->channel_resource_user_, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
428         self->channel_resource_user_ = nullptr;
429         if (channel_init_err == GRPC_ERROR_NONE) {
430           // Use notify_on_receive_settings callback to enforce the
431           // handshake deadline.
432           // Note: The reinterpret_cast<>s here are safe, because
433           // grpc_chttp2_transport is a C-style extension of
434           // grpc_transport, so this is morally equivalent of a
435           // static_cast<> to a derived class.
436           // TODO(roth): Change to static_cast<> when we C++-ify the
437           // transport API.
438           self->connection_->transport_ =
439               reinterpret_cast<grpc_chttp2_transport*>(transport);
440           GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
441                                     "ActiveConnection");  // Held by connection_
442           self->Ref().release();  // Held by OnReceiveSettings().
443           GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
444                             self, grpc_schedule_on_exec_ctx);
445           // If the listener has been configured with a config fetcher, we need
446           // to watch on the transport being closed so that we can an updated
447           // list of active connections.
448           grpc_closure* on_close = nullptr;
449           if (self->connection_->listener_->config_fetcher_watcher_ !=
450               nullptr) {
451             // Refs helds by OnClose()
452             self->connection_->Ref().release();
453             on_close = &self->connection_->on_close_;
454           } else {
455             // Remove the connection from the connections_ map since OnClose()
456             // will not be invoked when a config fetcher is set.
457             cleanup_connection = true;
458           }
459           grpc_chttp2_transport_start_reading(transport, args->read_buffer,
460                                               &self->on_receive_settings_,
461                                               on_close);
462           grpc_channel_args_destroy(args->args);
463           self->Ref().release();  // Held by OnTimeout().
464           GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
465                             grpc_schedule_on_exec_ctx);
466           grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
467         } else {
468           // Failed to create channel from transport. Clean up.
469           gpr_log(GPR_ERROR, "Failed to create channel: %s",
470                   grpc_error_std_string(channel_init_err).c_str());
471           GRPC_ERROR_UNREF(channel_init_err);
472           grpc_transport_destroy(transport);
473           grpc_slice_buffer_destroy_internal(args->read_buffer);
474           gpr_free(args->read_buffer);
475           cleanup_connection = true;
476           grpc_channel_args_destroy(args->args);
477         }
478       } else {
479         cleanup_connection = true;
480       }
481     }
482     // Since the handshake manager is done, the connection no longer needs to
483     // shutdown the handshake when the listener needs to stop serving.
484     // Avoid calling the destructor of HandshakeManager and HandshakingState
485     // from within the critical region.
486     handshake_mgr = std::move(self->handshake_mgr_);
487     handshaking_state_ref = std::move(self->connection_->handshaking_state_);
488   }
489   gpr_free(self->acceptor_);
490   self->acceptor_ = nullptr;
491   OrphanablePtr<ActiveConnection> connection;
492   if (self->channel_resource_user_ != nullptr) {
493     grpc_resource_user_free(self->channel_resource_user_,
494                             GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
495   }
496   if (cleanup_connection) {
497     MutexLock listener_lock(&self->connection_->listener_->mu_);
498     auto it = self->connection_->listener_->connections_.find(
499         self->connection_.get());
500     if (it != self->connection_->listener_->connections_.end()) {
501       connection = std::move(it->second);
502       self->connection_->listener_->connections_.erase(it);
503     }
504   }
505   self->Unref();
506 }
507
508 //
509 // Chttp2ServerListener::ActiveConnection
510 //
511
512 Chttp2ServerListener::ActiveConnection::ActiveConnection(
513     grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
514     grpc_channel_args* args, grpc_resource_user* channel_resource_user)
515     : handshaking_state_(MakeOrphanable<HandshakingState>(
516           Ref(), accepting_pollset, acceptor, args, channel_resource_user)) {
517   GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
518                     grpc_schedule_on_exec_ctx);
519 }
520
521 Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
522   if (transport_ != nullptr) {
523     GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
524   }
525 }
526
527 void Chttp2ServerListener::ActiveConnection::Orphan() {
528   OrphanablePtr<HandshakingState> handshaking_state;
529   {
530     MutexLock lock(&mu_);
531     shutdown_ = true;
532     // Reset handshaking_state_ since we have been orphaned by the listener
533     // signaling that the listener has stopped serving.
534     handshaking_state = std::move(handshaking_state_);
535   }
536   Unref();
537 }
538
539 void Chttp2ServerListener::ActiveConnection::SendGoAway() {
540   grpc_chttp2_transport* transport = nullptr;
541   {
542     MutexLock lock(&mu_);
543     transport = transport_;
544   }
545   if (transport != nullptr) {
546     grpc_transport_op* op = grpc_make_transport_op(nullptr);
547     op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
548         "Server is stopping to serve requests.");
549     grpc_transport_perform_op(&transport->base, op);
550   }
551 }
552
553 void Chttp2ServerListener::ActiveConnection::Start(
554     RefCountedPtr<Chttp2ServerListener> listener, grpc_endpoint* endpoint,
555     grpc_channel_args* args) {
556   RefCountedPtr<HandshakingState> handshaking_state_ref;
557   listener_ = std::move(listener);
558   {
559     MutexLock lock(&mu_);
560     if (shutdown_) return;
561     // Hold a ref to HandshakingState to allow starting the handshake outside
562     // the critical region.
563     handshaking_state_ref = handshaking_state_->Ref();
564   }
565   handshaking_state_ref->Start(endpoint, args);
566 }
567
568 void Chttp2ServerListener::ActiveConnection::OnClose(
569     void* arg, grpc_error_handle /* error */) {
570   ActiveConnection* self = static_cast<ActiveConnection*>(arg);
571   OrphanablePtr<ActiveConnection> connection;
572   {
573     MutexLock listener_lock(&self->listener_->mu_);
574     MutexLock connection_lock(&self->mu_);
575     // The node was already deleted from the connections_ list if the connection
576     // is shutdown.
577     if (!self->shutdown_) {
578       auto it = self->listener_->connections_.find(self);
579       if (it != self->listener_->connections_.end()) {
580         connection = std::move(it->second);
581         self->listener_->connections_.erase(it);
582       }
583     }
584   }
585   self->Unref();
586 }
587
588 //
589 // Chttp2ServerListener
590 //
591
592 grpc_error_handle Chttp2ServerListener::Create(
593     Server* server, grpc_resolved_address* addr, grpc_channel_args* args,
594     Chttp2ServerArgsModifier args_modifier, int* port_num) {
595   Chttp2ServerListener* listener = nullptr;
596   // The bulk of this method is inside of a lambda to make cleanup
597   // easier without using goto.
598   grpc_error_handle error = [&]() {
599     // Create Chttp2ServerListener.
600     listener = new Chttp2ServerListener(
601         server, args, args_modifier,
602         grpc_resource_quota_from_channel_args(args, true));
603     grpc_resource_quota_ref_internal(listener->resource_quota_);
604     error = grpc_tcp_server_create(
605         &listener->tcp_server_shutdown_complete_, args,
606         grpc_slice_allocator_factory_create(listener->resource_quota_),
607         &listener->tcp_server_);
608     if (error != GRPC_ERROR_NONE) return error;
609     if (server->config_fetcher() != nullptr) {
610       listener->resolved_address_ = *addr;
611       // TODO(yashykt): Consider binding so as to be able to return the port
612       // number.
613     } else {
614       error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
615       if (error != GRPC_ERROR_NONE) return error;
616     }
617     // Create channelz node.
618     if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
619                                     GRPC_ENABLE_CHANNELZ_DEFAULT)) {
620       std::string string_address = grpc_sockaddr_to_uri(addr);
621       listener->channelz_listen_socket_ =
622           MakeRefCounted<channelz::ListenSocketNode>(
623               string_address.c_str(),
624               absl::StrFormat("chttp2 listener %s", string_address.c_str()));
625     }
626     // Register with the server only upon success
627     server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
628     return GRPC_ERROR_NONE;
629   }();
630   if (error != GRPC_ERROR_NONE) {
631     if (listener != nullptr) {
632       if (listener->tcp_server_ != nullptr) {
633         // listener is deleted when tcp_server_ is shutdown.
634         grpc_tcp_server_unref(listener->tcp_server_);
635       } else {
636         delete listener;
637       }
638     } else {
639       grpc_channel_args_destroy(args);
640     }
641   }
642   return error;
643 }
644
645 grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
646     Server* server, const char* name, grpc_channel_args* args,
647     Chttp2ServerArgsModifier args_modifier) {
648   Chttp2ServerListener* listener = new Chttp2ServerListener(
649       server, args, args_modifier,
650       grpc_resource_quota_from_channel_args(args, true));
651   grpc_resource_quota_ref_internal(listener->resource_quota_);
652   grpc_error_handle error = grpc_tcp_server_create(
653       &listener->tcp_server_shutdown_complete_, args,
654       grpc_slice_allocator_factory_create(listener->resource_quota_),
655       &listener->tcp_server_);
656   if (error != GRPC_ERROR_NONE) {
657     delete listener;
658     return error;
659   }
660   // TODO(yangg) channelz
661   TcpServerFdHandler** arg_val =
662       grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
663   *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
664   server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
665   return GRPC_ERROR_NONE;
666 }
667
668 Chttp2ServerListener::Chttp2ServerListener(
669     Server* server, grpc_channel_args* args,
670     Chttp2ServerArgsModifier args_modifier, grpc_resource_quota* resource_quota)
671     : server_(server),
672       args_modifier_(args_modifier),
673       args_(args),
674       resource_quota_(resource_quota) {
675   GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
676                     this, grpc_schedule_on_exec_ctx);
677 }
678
679 Chttp2ServerListener::~Chttp2ServerListener() {
680   // Flush queued work before destroying handshaker factory, since that
681   // may do a synchronous unref.
682   ExecCtx::Get()->Flush();
683   if (on_destroy_done_ != nullptr) {
684     ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE);
685     ExecCtx::Get()->Flush();
686   }
687   grpc_resource_quota_unref_internal(resource_quota_);
688   grpc_channel_args_destroy(args_);
689 }
690
691 /* Server callback: start listening on our ports */
692 void Chttp2ServerListener::Start(
693     Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
694   if (server_->config_fetcher() != nullptr) {
695     grpc_channel_args* args = nullptr;
696     auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref());
697     config_fetcher_watcher_ = watcher.get();
698     {
699       MutexLock lock(&channel_args_mu_);
700       args = grpc_channel_args_copy(args_);
701     }
702     server_->config_fetcher()->StartWatch(
703         grpc_sockaddr_to_string(&resolved_address_, false), args,
704         std::move(watcher));
705   } else {
706     {
707       MutexLock lock(&mu_);
708       started_ = true;
709       is_serving_ = true;
710     }
711     StartListening();
712   }
713 }
714
715 void Chttp2ServerListener::StartListening() {
716   grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this);
717 }
718
719 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
720   MutexLock lock(&mu_);
721   on_destroy_done_ = on_destroy_done;
722 }
723
724 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
725                                     grpc_pollset* accepting_pollset,
726                                     grpc_tcp_server_acceptor* acceptor) {
727   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
728   grpc_channel_args* args = nullptr;
729   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
730       connection_manager;
731   {
732     MutexLock lock(&self->channel_args_mu_);
733     args = grpc_channel_args_copy(self->args_);
734     connection_manager = self->connection_manager_;
735   }
736   auto endpoint_cleanup = [&](grpc_error_handle error) {
737     grpc_endpoint_shutdown(tcp, error);
738     grpc_endpoint_destroy(tcp);
739     gpr_free(acceptor);
740   };
741   if (self->server_->config_fetcher() != nullptr) {
742     if (connection_manager == nullptr) {
743       grpc_error_handle error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
744           "No ConnectionManager configured. Closing connection.");
745       endpoint_cleanup(error);
746       grpc_channel_args_destroy(args);
747       return;
748     }
749     // TODO(yashykt): Maybe combine the following two arg modifiers into a
750     // single one.
751     absl::StatusOr<grpc_channel_args*> args_result =
752         connection_manager->UpdateChannelArgsForConnection(args, tcp);
753     if (!args_result.ok()) {
754       gpr_log(GPR_DEBUG, "Closing connection: %s",
755               args_result.status().ToString().c_str());
756       endpoint_cleanup(
757           GRPC_ERROR_CREATE_FROM_CPP_STRING(args_result.status().ToString()));
758       return;
759     }
760     grpc_error_handle error = GRPC_ERROR_NONE;
761     args = self->args_modifier_(*args_result, &error);
762     if (error != GRPC_ERROR_NONE) {
763       gpr_log(GPR_DEBUG, "Closing connection: %s",
764               grpc_error_std_string(error).c_str());
765       endpoint_cleanup(error);
766       grpc_channel_args_destroy(args);
767       return;
768     }
769   }
770   grpc_resource_user* channel_resource_user = grpc_resource_user_create(
771       self->resource_quota_,
772       absl::StrCat(grpc_endpoint_get_peer(tcp), ":server_channel"));
773   auto connection = MakeOrphanable<ActiveConnection>(
774       accepting_pollset, acceptor, args, channel_resource_user);
775   // We no longer own acceptor
776   acceptor = nullptr;
777   // Hold a ref to connection to allow starting handshake outside the
778   // critical region
779   RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
780   RefCountedPtr<Chttp2ServerListener> listener_ref;
781   {
782     MutexLock lock(&self->mu_);
783     // Shutdown the the connection if listener's stopped serving.
784     if (!self->shutdown_ && self->is_serving_) {
785       if (!grpc_resource_user_safe_alloc(channel_resource_user,
786                                          GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
787         gpr_log(
788             GPR_INFO,
789             "Memory quota exhausted, rejecting connection, no handshaking.");
790       } else {
791         // This ref needs to be taken in the critical region after having made
792         // sure that the listener has not been Orphaned, so as to avoid
793         // heap-use-after-free issues where `Ref()` is invoked when the ref of
794         // tcp_server_ has already reached 0. (Ref() implementation of
795         // Chttp2ServerListener is grpc_tcp_server_ref().)
796         listener_ref = self->Ref();
797         self->connections_.emplace(connection.get(), std::move(connection));
798       }
799     }
800   }
801   if (connection != nullptr) {
802     endpoint_cleanup(GRPC_ERROR_NONE);
803   } else {
804     connection_ref->Start(std::move(listener_ref), tcp, args);
805   }
806   grpc_channel_args_destroy(args);
807 }
808
809 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
810                                                      grpc_error_handle error) {
811   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
812   self->channelz_listen_socket_.reset();
813   GRPC_ERROR_UNREF(error);
814   delete self;
815 }
816
817 /* Server callback: destroy the tcp listener (so we don't generate further
818    callbacks) */
819 void Chttp2ServerListener::Orphan() {
820   // Cancel the watch before shutting down so as to avoid holding a ref to the
821   // listener in the watcher.
822   if (config_fetcher_watcher_ != nullptr) {
823     server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
824   }
825   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
826   grpc_tcp_server* tcp_server;
827   {
828     MutexLock lock(&mu_);
829     shutdown_ = true;
830     is_serving_ = false;
831     // Orphan the connections so that they can start cleaning up.
832     connections = std::move(connections_);
833     // If the listener is currently set to be serving but has not been started
834     // yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
835     // operation to finish to avoid causing races.
836     while (is_serving_ && !started_) {
837       started_cv_.Wait(&mu_);
838     }
839     tcp_server = tcp_server_;
840   }
841   grpc_tcp_server_shutdown_listeners(tcp_server);
842   grpc_tcp_server_unref(tcp_server);
843 }
844
845 }  // namespace
846
847 //
848 // Chttp2ServerAddPort()
849 //
850
851 grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
852                                       grpc_channel_args* args,
853                                       Chttp2ServerArgsModifier args_modifier,
854                                       int* port_num) {
855   if (strncmp(addr, "external:", 9) == 0) {
856     return grpc_core::Chttp2ServerListener::CreateWithAcceptor(
857         server, addr, args, args_modifier);
858   }
859   *port_num = -1;
860   grpc_resolved_addresses* resolved = nullptr;
861   std::vector<grpc_error_handle> error_list;
862   // Using lambda to avoid use of goto.
863   grpc_error_handle error = [&]() {
864     if (absl::StartsWith(addr, kUnixUriPrefix)) {
865       error = grpc_resolve_unix_domain_address(
866           addr + sizeof(kUnixUriPrefix) - 1, &resolved);
867     } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
868       error = grpc_resolve_unix_abstract_domain_address(
869           addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
870     } else {
871       error = grpc_blocking_resolve_address(addr, "https", &resolved);
872     }
873     if (error != GRPC_ERROR_NONE) return error;
874     // Create a listener for each resolved address.
875     for (size_t i = 0; i < resolved->naddrs; i++) {
876       // If address has a wildcard port (0), use the same port as a previous
877       // listener.
878       if (*port_num != -1 && grpc_sockaddr_get_port(&resolved->addrs[i]) == 0) {
879         grpc_sockaddr_set_port(&resolved->addrs[i], *port_num);
880       }
881       int port_temp = -1;
882       error = grpc_core::Chttp2ServerListener::Create(
883           server, &resolved->addrs[i], grpc_channel_args_copy(args),
884           args_modifier, &port_temp);
885       if (error != GRPC_ERROR_NONE) {
886         error_list.push_back(error);
887       } else {
888         if (*port_num == -1) {
889           *port_num = port_temp;
890         } else {
891           GPR_ASSERT(*port_num == port_temp);
892         }
893       }
894     }
895     if (error_list.size() == resolved->naddrs) {
896       std::string msg =
897           absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
898                           resolved->naddrs);
899       return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
900           msg.c_str(), error_list.data(), error_list.size());
901     } else if (!error_list.empty()) {
902       std::string msg = absl::StrFormat(
903           "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
904           " resolved",
905           resolved->naddrs - error_list.size(), resolved->naddrs);
906       error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
907           msg.c_str(), error_list.data(), error_list.size());
908       gpr_log(GPR_INFO, "WARNING: %s", grpc_error_std_string(error).c_str());
909       GRPC_ERROR_UNREF(error);
910       // we managed to bind some addresses: continue without error
911     }
912     return GRPC_ERROR_NONE;
913   }();  // lambda end
914   for (grpc_error_handle error : error_list) {
915     GRPC_ERROR_UNREF(error);
916   }
917   grpc_channel_args_destroy(args);
918   if (resolved != nullptr) {
919     grpc_resolved_addresses_destroy(resolved);
920   }
921   if (error != GRPC_ERROR_NONE) *port_num = 0;
922   return error;
923 }
924
925 }  // namespace grpc_core