3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
28 #include "absl/strings/match.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
32 #include <grpc/grpc.h>
33 #include <grpc/impl/codegen/grpc_types.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/sync.h>
38 #include "src/core/ext/filters/http/server/http_server_filter.h"
39 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
40 #include "src/core/ext/transport/chttp2/transport/internal.h"
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/channel/handshaker.h"
43 #include "src/core/lib/channel/handshaker_registry.h"
44 #include "src/core/lib/gprpp/ref_counted.h"
45 #include "src/core/lib/gprpp/ref_counted_ptr.h"
46 #include "src/core/lib/iomgr/endpoint.h"
47 #include "src/core/lib/iomgr/resolve_address.h"
48 #include "src/core/lib/iomgr/resource_quota.h"
49 #include "src/core/lib/iomgr/sockaddr_utils.h"
50 #include "src/core/lib/iomgr/tcp_server.h"
51 #include "src/core/lib/iomgr/unix_sockets_posix.h"
52 #include "src/core/lib/slice/slice_internal.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/server.h"
59 const char kUnixUriPrefix[] = "unix:";
60 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
62 class Chttp2ServerListener : public Server::ListenerInterface {
64 static grpc_error* Create(Server* server, grpc_resolved_address* addr,
65 grpc_channel_args* args,
66 Chttp2ServerArgsModifier args_modifier,
69 static grpc_error* CreateWithAcceptor(Server* server, const char* name,
70 grpc_channel_args* args,
71 Chttp2ServerArgsModifier args_modifier);
73 // Do not instantiate directly. Use one of the factory methods above.
74 Chttp2ServerListener(Server* server, grpc_channel_args* args,
75 Chttp2ServerArgsModifier args_modifier);
76 ~Chttp2ServerListener() override;
78 void Start(Server* server,
79 const std::vector<grpc_pollset*>* pollsets) override;
81 channelz::ListenSocketNode* channelz_listen_socket_node() const override {
82 return channelz_listen_socket_.get();
85 void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
87 void Orphan() override;
90 class ConfigFetcherWatcher
91 : public grpc_server_config_fetcher::WatcherInterface {
93 explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
94 : listener_(std::move(listener)) {}
96 void UpdateConnectionManager(
97 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
98 connection_manager) override;
100 void StopServing() override;
103 RefCountedPtr<Chttp2ServerListener> listener_;
106 class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
108 class HandshakingState : public InternallyRefCounted<HandshakingState> {
110 HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
111 grpc_pollset* accepting_pollset,
112 grpc_tcp_server_acceptor* acceptor,
113 grpc_channel_args* args);
115 ~HandshakingState() override;
117 void Orphan() override;
119 void Start(grpc_endpoint* endpoint, grpc_channel_args* args);
121 // Needed to be able to grab an external ref in ActiveConnection::Start()
122 using InternallyRefCounted<HandshakingState>::Ref;
125 static void OnTimeout(void* arg, grpc_error* error);
126 static void OnReceiveSettings(void* arg, grpc_error* /* error */);
127 static void OnHandshakeDone(void* arg, grpc_error* error);
128 RefCountedPtr<ActiveConnection> const connection_;
129 grpc_pollset* const accepting_pollset_;
130 grpc_tcp_server_acceptor* const acceptor_;
131 RefCountedPtr<HandshakeManager> handshake_mgr_
132 ABSL_GUARDED_BY(&connection_->mu_);
133 // State for enforcing handshake timeout on receiving HTTP/2 settings.
134 grpc_millis const deadline_;
135 grpc_timer timer_ ABSL_GUARDED_BY(&connection_->mu_);
136 grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_);
137 grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
138 grpc_pollset_set* const interested_parties_;
141 ActiveConnection(grpc_pollset* accepting_pollset,
142 grpc_tcp_server_acceptor* acceptor,
143 grpc_channel_args* args);
144 ~ActiveConnection() override;
146 void Orphan() override;
150 void Start(RefCountedPtr<Chttp2ServerListener> listener,
151 grpc_endpoint* endpoint, grpc_channel_args* args);
153 // Needed to be able to grab an external ref in
154 // Chttp2ServerListener::OnAccept()
155 using InternallyRefCounted<ActiveConnection>::Ref;
158 static void OnClose(void* arg, grpc_error* error);
160 RefCountedPtr<Chttp2ServerListener> listener_;
161 Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
162 // Set by HandshakingState before the handshaking begins and reset when
163 // handshaking is done.
164 OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
165 // Set by HandshakingState when handshaking is done and a valid transport is
167 grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
168 grpc_closure on_close_;
169 bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
172 // To allow access to RefCounted<> like interface.
173 friend class RefCountedPtr<Chttp2ServerListener>;
175 // Should only be called once so as to start the TCP server.
176 void StartListening();
178 static void OnAccept(void* arg, grpc_endpoint* tcp,
179 grpc_pollset* accepting_pollset,
180 grpc_tcp_server_acceptor* acceptor);
182 static void TcpServerShutdownComplete(void* arg, grpc_error* error);
184 static void DestroyListener(Server* /*server*/, void* arg,
185 grpc_closure* destroy_done);
187 // The interface required by RefCountedPtr<> has been manually implemented
188 // here to take a ref on tcp_server_ instead. Note that, the handshaker needs
189 // tcp_server_ to exist for the lifetime of the handshake since it's needed by
190 // acceptor. Sharing refs between the listener and tcp_server_ is just an
191 // optimization to avoid taking additional refs on the listener, since
192 // TcpServerShutdownComplete already holds a ref to the listener.
193 void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
194 void IncrementRefCount(const DebugLocation& /* location */,
195 const char* /* reason */) {
199 RefCountedPtr<Chttp2ServerListener> Ref() GRPC_MUST_USE_RESULT {
201 return RefCountedPtr<Chttp2ServerListener>(this);
203 RefCountedPtr<Chttp2ServerListener> Ref(const DebugLocation& /* location */,
204 const char* /* reason */)
205 GRPC_MUST_USE_RESULT {
209 void Unref() { grpc_tcp_server_unref(tcp_server_); }
210 void Unref(const DebugLocation& /* location */, const char* /* reason */) {
214 Server* const server_;
215 grpc_tcp_server* tcp_server_;
216 grpc_resolved_address resolved_address_;
217 Chttp2ServerArgsModifier const args_modifier_;
218 ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
219 Mutex channel_args_mu_;
220 grpc_channel_args* args_ ABSL_GUARDED_BY(channel_args_mu_);
221 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
222 connection_manager_ ABSL_GUARDED_BY(channel_args_mu_);
224 // Signals whether grpc_tcp_server_start() has been called.
225 bool started_ ABSL_GUARDED_BY(mu_) = false;
226 // Signals whether grpc_tcp_server_start() has completed.
227 CondVar started_cv_ ABSL_GUARDED_BY(mu_);
228 // Signals whether new requests/connections are to be accepted.
229 bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
230 // Signals whether the application has triggered shutdown.
231 bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
232 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
233 ABSL_GUARDED_BY(mu_);
234 grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
235 grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
236 RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
240 // Chttp2ServerListener::ConfigFetcherWatcher
243 void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
244 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
245 connection_manager) {
246 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
247 connection_manager_to_destroy;
249 MutexLock lock(&listener_->channel_args_mu_);
250 connection_manager_to_destroy = listener_->connection_manager_;
251 listener_->connection_manager_ = std::move(connection_manager);
254 MutexLock lock(&listener_->mu_);
255 if (listener_->shutdown_) {
258 listener_->is_serving_ = true;
259 if (listener_->started_) return;
262 grpc_error* error = grpc_tcp_server_add_port(
263 listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
264 if (error != GRPC_ERROR_NONE) {
265 GRPC_ERROR_UNREF(error);
266 gpr_log(GPR_ERROR, "Error adding port to server: %s",
267 grpc_error_string(error));
268 // TODO(yashykt): We wouldn't need to assert here if we bound to the
269 // port earlier during AddPort.
272 listener_->StartListening();
274 MutexLock lock(&listener_->mu_);
275 listener_->started_ = true;
276 listener_->started_cv_.SignalAll();
280 void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
281 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
283 MutexLock lock(&listener_->mu_);
284 listener_->is_serving_ = false;
285 connections = std::move(listener_->connections_);
287 // Send GOAWAYs on the transports so that they disconnected when existing RPCs
289 for (auto& connection : connections) {
290 connection.first->SendGoAway();
295 // Chttp2ServerListener::ActiveConnection::HandshakingState
298 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
300 grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
301 {120 * GPR_MS_PER_SEC, 1, INT_MAX});
302 return ExecCtx::Get()->Now() + timeout_ms;
305 Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
306 RefCountedPtr<ActiveConnection> connection_ref,
307 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
308 grpc_channel_args* args)
309 : connection_(std::move(connection_ref)),
310 accepting_pollset_(accepting_pollset),
312 handshake_mgr_(MakeRefCounted<HandshakeManager>()),
313 deadline_(GetConnectionDeadline(args)),
314 interested_parties_(grpc_pollset_set_create()) {
315 grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
316 HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
317 interested_parties_, handshake_mgr_.get());
320 Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
321 grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
322 grpc_pollset_set_destroy(interested_parties_);
325 void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
327 MutexLock lock(&connection_->mu_);
328 if (handshake_mgr_ != nullptr) {
329 handshake_mgr_->Shutdown(
330 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Listener stopped serving."));
336 void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
337 grpc_endpoint* endpoint, grpc_channel_args* args) {
338 Ref().release(); // Held by OnHandshakeDone
339 RefCountedPtr<HandshakeManager> handshake_mgr;
341 MutexLock lock(&connection_->mu_);
342 if (handshake_mgr_ == nullptr) return;
343 handshake_mgr = handshake_mgr_;
345 handshake_mgr->DoHandshake(endpoint, args, deadline_, acceptor_,
346 OnHandshakeDone, this);
349 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout(
350 void* arg, grpc_error* error) {
351 HandshakingState* self = static_cast<HandshakingState*>(arg);
352 // Note that we may be called with GRPC_ERROR_NONE when the timer fires
353 // or with an error indicating that the timer system is being shut down.
354 if (error != GRPC_ERROR_CANCELLED) {
355 grpc_transport_op* op = grpc_make_transport_op(nullptr);
356 op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
357 "Did not receive HTTP/2 settings before handshake timeout");
358 grpc_chttp2_transport* transport = nullptr;
360 MutexLock lock(&self->connection_->mu_);
361 transport = self->connection_->transport_;
363 grpc_transport_perform_op(&transport->base, op);
368 void Chttp2ServerListener::ActiveConnection::HandshakingState::
369 OnReceiveSettings(void* arg, grpc_error* /* error */) {
370 HandshakingState* self = static_cast<HandshakingState*>(arg);
371 grpc_timer_cancel(&self->timer_);
375 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
376 void* arg, grpc_error* error) {
377 auto* args = static_cast<HandshakerArgs*>(arg);
378 HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
379 OrphanablePtr<HandshakingState> handshaking_state_ref;
380 RefCountedPtr<HandshakeManager> handshake_mgr;
381 bool cleanup_connection = false;
382 bool free_resource_quota = false;
383 grpc_resource_user* resource_user =
384 self->connection_->listener_->server_->default_resource_user();
386 MutexLock connection_lock(&self->connection_->mu_);
387 if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) {
388 const char* error_str = grpc_error_string(error);
389 gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
390 cleanup_connection = true;
391 free_resource_quota = true;
392 if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
393 // We were shut down or stopped serving after handshaking completed
394 // successfully, so destroy the endpoint here.
395 // TODO(ctiller): It is currently necessary to shutdown endpoints
396 // before destroying them, even if we know that there are no
397 // pending read/write callbacks. This should be fixed, at which
398 // point this can be removed.
399 grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
400 grpc_endpoint_destroy(args->endpoint);
401 grpc_channel_args_destroy(args->args);
402 grpc_slice_buffer_destroy_internal(args->read_buffer);
403 gpr_free(args->read_buffer);
406 // If the handshaking succeeded but there is no endpoint, then the
407 // handshaker may have handed off the connection to some external
408 // code, so we can just clean up here without creating a transport.
409 if (args->endpoint != nullptr) {
410 grpc_transport* transport = grpc_create_chttp2_transport(
411 args->args, args->endpoint, false, resource_user);
412 grpc_error* channel_init_err =
413 self->connection_->listener_->server_->SetupTransport(
414 transport, self->accepting_pollset_, args->args,
415 grpc_chttp2_transport_get_socket_node(transport),
417 if (channel_init_err == GRPC_ERROR_NONE) {
418 // Use notify_on_receive_settings callback to enforce the
419 // handshake deadline.
420 // Note: The reinterpret_cast<>s here are safe, because
421 // grpc_chttp2_transport is a C-style extension of
422 // grpc_transport, so this is morally equivalent of a
423 // static_cast<> to a derived class.
424 // TODO(roth): Change to static_cast<> when we C++-ify the
426 self->connection_->transport_ =
427 reinterpret_cast<grpc_chttp2_transport*>(transport);
428 GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
429 "ActiveConnection"); // Held by connection_
430 self->Ref().release(); // Held by OnReceiveSettings().
431 GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
432 self, grpc_schedule_on_exec_ctx);
433 // If the listener has been configured with a config fetcher, we need
434 // to watch on the transport being closed so that we can an updated
435 // list of active connections.
436 grpc_closure* on_close = nullptr;
437 if (self->connection_->listener_->config_fetcher_watcher_ !=
439 // Refs helds by OnClose()
440 self->connection_->Ref().release();
441 on_close = &self->connection_->on_close_;
443 // Remove the connection from the connections_ map since OnClose()
444 // will not be invoked when a config fetcher is set.
445 cleanup_connection = true;
447 grpc_chttp2_transport_start_reading(transport, args->read_buffer,
448 &self->on_receive_settings_,
450 grpc_channel_args_destroy(args->args);
451 self->Ref().release(); // Held by OnTimeout().
452 GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
453 grpc_schedule_on_exec_ctx);
454 grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
456 // Failed to create channel from transport. Clean up.
457 gpr_log(GPR_ERROR, "Failed to create channel: %s",
458 grpc_error_string(channel_init_err));
459 GRPC_ERROR_UNREF(channel_init_err);
460 grpc_transport_destroy(transport);
461 grpc_slice_buffer_destroy_internal(args->read_buffer);
462 gpr_free(args->read_buffer);
463 cleanup_connection = true;
464 free_resource_quota = true;
465 grpc_channel_args_destroy(args->args);
468 cleanup_connection = true;
469 free_resource_quota = true;
472 // Since the handshake manager is done, the connection no longer needs to
473 // shutdown the handshake when the listener needs to stop serving.
474 // Avoid calling the destructor of HandshakeManager and HandshakingState
475 // from within the critical region.
476 handshake_mgr = std::move(self->handshake_mgr_);
477 handshaking_state_ref = std::move(self->connection_->handshaking_state_);
479 gpr_free(self->acceptor_);
480 OrphanablePtr<ActiveConnection> connection;
481 if (free_resource_quota && resource_user != nullptr) {
482 grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
484 if (cleanup_connection) {
485 MutexLock listener_lock(&self->connection_->listener_->mu_);
486 auto it = self->connection_->listener_->connections_.find(
487 self->connection_.get());
488 if (it != self->connection_->listener_->connections_.end()) {
489 connection = std::move(it->second);
490 self->connection_->listener_->connections_.erase(it);
497 // Chttp2ServerListener::ActiveConnection
500 Chttp2ServerListener::ActiveConnection::ActiveConnection(
501 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
502 grpc_channel_args* args)
503 : handshaking_state_(MakeOrphanable<HandshakingState>(
504 Ref(), accepting_pollset, acceptor, args)) {
505 GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
506 grpc_schedule_on_exec_ctx);
509 Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
510 if (transport_ != nullptr) {
511 GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
515 void Chttp2ServerListener::ActiveConnection::Orphan() {
516 OrphanablePtr<HandshakingState> handshaking_state;
518 MutexLock lock(&mu_);
520 // Reset handshaking_state_ since we have been orphaned by the listener
521 // signaling that the listener has stopped serving.
522 handshaking_state = std::move(handshaking_state_);
527 void Chttp2ServerListener::ActiveConnection::SendGoAway() {
528 grpc_chttp2_transport* transport = nullptr;
530 MutexLock lock(&mu_);
531 transport = transport_;
533 if (transport != nullptr) {
534 grpc_transport_op* op = grpc_make_transport_op(nullptr);
535 op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
536 "Server is stopping to serve requests.");
537 grpc_transport_perform_op(&transport->base, op);
541 void Chttp2ServerListener::ActiveConnection::Start(
542 RefCountedPtr<Chttp2ServerListener> listener, grpc_endpoint* endpoint,
543 grpc_channel_args* args) {
544 RefCountedPtr<HandshakingState> handshaking_state_ref;
545 listener_ = std::move(listener);
547 MutexLock lock(&mu_);
548 if (shutdown_) return;
549 // Hold a ref to HandshakingState to allow starting the handshake outside
550 // the critical region.
551 handshaking_state_ref = handshaking_state_->Ref();
553 handshaking_state_ref->Start(endpoint, args);
556 void Chttp2ServerListener::ActiveConnection::OnClose(void* arg,
557 grpc_error* /* error */) {
558 ActiveConnection* self = static_cast<ActiveConnection*>(arg);
559 OrphanablePtr<ActiveConnection> connection;
561 MutexLock listener_lock(&self->listener_->mu_);
562 MutexLock connection_lock(&self->mu_);
563 // The node was already deleted from the connections_ list if the connection
565 if (!self->shutdown_) {
566 auto it = self->listener_->connections_.find(self);
567 if (it != self->listener_->connections_.end()) {
568 connection = std::move(it->second);
569 self->listener_->connections_.erase(it);
577 // Chttp2ServerListener
580 grpc_error* Chttp2ServerListener::Create(Server* server,
581 grpc_resolved_address* addr,
582 grpc_channel_args* args,
583 Chttp2ServerArgsModifier args_modifier,
585 Chttp2ServerListener* listener = nullptr;
586 // The bulk of this method is inside of a lambda to make cleanup
587 // easier without using goto.
588 grpc_error* error = [&]() {
589 // Create Chttp2ServerListener.
590 listener = new Chttp2ServerListener(server, args, args_modifier);
591 error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
592 args, &listener->tcp_server_);
593 if (error != GRPC_ERROR_NONE) return error;
594 if (server->config_fetcher() != nullptr) {
595 listener->resolved_address_ = *addr;
596 // TODO(yashykt): Consider binding so as to be able to return the port
599 error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
600 if (error != GRPC_ERROR_NONE) return error;
602 // Create channelz node.
603 if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
604 GRPC_ENABLE_CHANNELZ_DEFAULT)) {
605 std::string string_address = grpc_sockaddr_to_uri(addr);
606 listener->channelz_listen_socket_ =
607 MakeRefCounted<channelz::ListenSocketNode>(
608 string_address.c_str(),
609 absl::StrFormat("chttp2 listener %s", string_address.c_str()));
611 // Register with the server only upon success
612 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
613 return GRPC_ERROR_NONE;
615 if (error != GRPC_ERROR_NONE) {
616 if (listener != nullptr) {
617 if (listener->tcp_server_ != nullptr) {
618 // listener is deleted when tcp_server_ is shutdown.
619 grpc_tcp_server_unref(listener->tcp_server_);
624 grpc_channel_args_destroy(args);
630 grpc_error* Chttp2ServerListener::CreateWithAcceptor(
631 Server* server, const char* name, grpc_channel_args* args,
632 Chttp2ServerArgsModifier args_modifier) {
633 Chttp2ServerListener* listener =
634 new Chttp2ServerListener(server, args, args_modifier);
635 grpc_error* error = grpc_tcp_server_create(
636 &listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_);
637 if (error != GRPC_ERROR_NONE) {
641 // TODO(yangg) channelz
642 TcpServerFdHandler** arg_val =
643 grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
644 *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
645 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
646 return GRPC_ERROR_NONE;
649 Chttp2ServerListener::Chttp2ServerListener(
650 Server* server, grpc_channel_args* args,
651 Chttp2ServerArgsModifier args_modifier)
652 : server_(server), args_modifier_(args_modifier), args_(args) {
653 GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
654 this, grpc_schedule_on_exec_ctx);
657 Chttp2ServerListener::~Chttp2ServerListener() {
658 // Flush queued work before destroying handshaker factory, since that
659 // may do a synchronous unref.
660 ExecCtx::Get()->Flush();
661 if (on_destroy_done_ != nullptr) {
662 ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE);
663 ExecCtx::Get()->Flush();
665 grpc_channel_args_destroy(args_);
668 /* Server callback: start listening on our ports */
669 void Chttp2ServerListener::Start(
670 Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
671 if (server_->config_fetcher() != nullptr) {
672 grpc_channel_args* args = nullptr;
673 auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref());
674 config_fetcher_watcher_ = watcher.get();
676 MutexLock lock(&channel_args_mu_);
677 args = grpc_channel_args_copy(args_);
679 server_->config_fetcher()->StartWatch(
680 grpc_sockaddr_to_string(&resolved_address_, false), args,
684 MutexLock lock(&mu_);
692 void Chttp2ServerListener::StartListening() {
693 grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this);
696 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
697 MutexLock lock(&mu_);
698 on_destroy_done_ = on_destroy_done;
701 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
702 grpc_pollset* accepting_pollset,
703 grpc_tcp_server_acceptor* acceptor) {
704 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
705 grpc_channel_args* args = nullptr;
706 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
709 MutexLock lock(&self->channel_args_mu_);
710 args = grpc_channel_args_copy(self->args_);
711 connection_manager = self->connection_manager_;
713 auto endpoint_cleanup = [&](grpc_error* error) {
714 grpc_endpoint_shutdown(tcp, error);
715 grpc_endpoint_destroy(tcp);
718 if (self->server_->config_fetcher() != nullptr) {
719 if (connection_manager == nullptr) {
720 grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
721 "No ConnectionManager configured. Closing connection.");
722 endpoint_cleanup(error);
723 grpc_channel_args_destroy(args);
726 // TODO(yashykt): Maybe combine the following two arg modifiers into a
728 absl::StatusOr<grpc_channel_args*> args_result =
729 connection_manager->UpdateChannelArgsForConnection(args, tcp);
730 if (!args_result.ok()) {
731 gpr_log(GPR_DEBUG, "Closing connection: %s",
732 args_result.status().ToString().c_str());
733 endpoint_cleanup(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
734 args_result.status().ToString().c_str()));
737 grpc_error* error = GRPC_ERROR_NONE;
738 args = self->args_modifier_(*args_result, &error);
739 if (error != GRPC_ERROR_NONE) {
740 gpr_log(GPR_DEBUG, "Closing connection: %s", grpc_error_string(error));
741 endpoint_cleanup(error);
742 grpc_channel_args_destroy(args);
747 MakeOrphanable<ActiveConnection>(accepting_pollset, acceptor, args);
748 // Hold a ref to connection to allow starting handshake outside the
750 RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
751 RefCountedPtr<Chttp2ServerListener> listener_ref;
753 MutexLock lock(&self->mu_);
754 // Shutdown the the connection if listener's stopped serving.
755 if (!self->shutdown_ && self->is_serving_) {
756 grpc_resource_user* resource_user =
757 self->server_->default_resource_user();
758 if (resource_user != nullptr &&
759 !grpc_resource_user_safe_alloc(resource_user,
760 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
763 "Memory quota exhausted, rejecting connection, no handshaking.");
765 // This ref needs to be taken in the critical region after having made
766 // sure that the listener has not been Orphaned, so as to avoid
767 // heap-use-after-free issues where `Ref()` is invoked when the ref of
768 // tcp_server_ has already reached 0. (Ref() implementation of
769 // Chttp2ServerListener is grpc_tcp_server_ref().)
770 listener_ref = self->Ref();
771 self->connections_.emplace(connection.get(), std::move(connection));
775 if (connection != nullptr) {
776 endpoint_cleanup(GRPC_ERROR_NONE);
778 connection_ref->Start(std::move(listener_ref), tcp, args);
780 grpc_channel_args_destroy(args);
783 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
785 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
786 self->channelz_listen_socket_.reset();
787 GRPC_ERROR_UNREF(error);
791 /* Server callback: destroy the tcp listener (so we don't generate further
793 void Chttp2ServerListener::Orphan() {
794 // Cancel the watch before shutting down so as to avoid holding a ref to the
795 // listener in the watcher.
796 if (config_fetcher_watcher_ != nullptr) {
797 server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
799 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
800 grpc_tcp_server* tcp_server;
802 MutexLock lock(&mu_);
805 // Orphan the connections so that they can start cleaning up.
806 connections = std::move(connections_);
807 // If the listener is currently set to be serving but has not been started
808 // yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
809 // operation to finish to avoid causing races.
810 while (is_serving_ && !started_) {
811 started_cv_.Wait(&mu_);
813 tcp_server = tcp_server_;
815 grpc_tcp_server_shutdown_listeners(tcp_server);
816 grpc_tcp_server_unref(tcp_server);
822 // Chttp2ServerAddPort()
825 grpc_error* Chttp2ServerAddPort(Server* server, const char* addr,
826 grpc_channel_args* args,
827 Chttp2ServerArgsModifier args_modifier,
829 if (strncmp(addr, "external:", 9) == 0) {
830 return grpc_core::Chttp2ServerListener::CreateWithAcceptor(
831 server, addr, args, args_modifier);
834 grpc_resolved_addresses* resolved = nullptr;
835 std::vector<grpc_error*> error_list;
836 // Using lambda to avoid use of goto.
837 grpc_error* error = [&]() {
838 if (absl::StartsWith(addr, kUnixUriPrefix)) {
839 error = grpc_resolve_unix_domain_address(
840 addr + sizeof(kUnixUriPrefix) - 1, &resolved);
841 } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
842 error = grpc_resolve_unix_abstract_domain_address(
843 addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
845 error = grpc_blocking_resolve_address(addr, "https", &resolved);
847 if (error != GRPC_ERROR_NONE) return error;
848 // Create a listener for each resolved address.
849 for (size_t i = 0; i < resolved->naddrs; i++) {
850 // If address has a wildcard port (0), use the same port as a previous
852 if (*port_num != -1 && grpc_sockaddr_get_port(&resolved->addrs[i]) == 0) {
853 grpc_sockaddr_set_port(&resolved->addrs[i], *port_num);
856 error = grpc_core::Chttp2ServerListener::Create(
857 server, &resolved->addrs[i], grpc_channel_args_copy(args),
858 args_modifier, &port_temp);
859 if (error != GRPC_ERROR_NONE) {
860 error_list.push_back(error);
862 if (*port_num == -1) {
863 *port_num = port_temp;
865 GPR_ASSERT(*port_num == port_temp);
869 if (error_list.size() == resolved->naddrs) {
871 absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
873 return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
874 msg.c_str(), error_list.data(), error_list.size());
875 } else if (!error_list.empty()) {
876 std::string msg = absl::StrFormat(
877 "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
879 resolved->naddrs - error_list.size(), resolved->naddrs);
880 error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
881 msg.c_str(), error_list.data(), error_list.size());
882 gpr_log(GPR_INFO, "WARNING: %s", grpc_error_string(error));
883 GRPC_ERROR_UNREF(error);
884 // we managed to bind some addresses: continue without error
886 return GRPC_ERROR_NONE;
888 for (grpc_error* error : error_list) {
889 GRPC_ERROR_UNREF(error);
891 grpc_channel_args_destroy(args);
892 if (resolved != nullptr) {
893 grpc_resolved_addresses_destroy(resolved);
895 if (error != GRPC_ERROR_NONE) *port_num = 0;
899 } // namespace grpc_core