3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
28 #include "absl/strings/match.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
32 #include <grpc/grpc.h>
33 #include <grpc/impl/codegen/grpc_types.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/sync.h>
38 #include "src/core/ext/filters/http/server/http_server_filter.h"
39 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
40 #include "src/core/ext/transport/chttp2/transport/internal.h"
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/handshaker.h"
44 #include "src/core/lib/channel/handshaker_registry.h"
45 #include "src/core/lib/gprpp/ref_counted.h"
46 #include "src/core/lib/gprpp/ref_counted_ptr.h"
47 #include "src/core/lib/iomgr/endpoint.h"
48 #include "src/core/lib/iomgr/resolve_address.h"
49 #include "src/core/lib/iomgr/resource_quota.h"
50 #include "src/core/lib/iomgr/tcp_server.h"
51 #include "src/core/lib/iomgr/unix_sockets_posix.h"
52 #include "src/core/lib/slice/slice_internal.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/server.h"
59 const char kUnixUriPrefix[] = "unix:";
60 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
62 class Chttp2ServerListener : public Server::ListenerInterface {
64 static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
65 grpc_channel_args* args,
66 Chttp2ServerArgsModifier args_modifier,
69 static grpc_error_handle CreateWithAcceptor(
70 Server* server, const char* name, grpc_channel_args* args,
71 Chttp2ServerArgsModifier args_modifier);
73 // Do not instantiate directly. Use one of the factory methods above.
74 Chttp2ServerListener(Server* server, grpc_channel_args* args,
75 Chttp2ServerArgsModifier args_modifier);
76 ~Chttp2ServerListener() override;
78 void Start(Server* server,
79 const std::vector<grpc_pollset*>* pollsets) override;
81 channelz::ListenSocketNode* channelz_listen_socket_node() const override {
82 return channelz_listen_socket_.get();
85 void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
87 void Orphan() override;
90 class ConfigFetcherWatcher
91 : public grpc_server_config_fetcher::WatcherInterface {
93 explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
94 : listener_(std::move(listener)) {}
96 void UpdateConnectionManager(
97 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
98 connection_manager) override;
100 void StopServing() override;
103 RefCountedPtr<Chttp2ServerListener> listener_;
106 class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
108 class HandshakingState : public InternallyRefCounted<HandshakingState> {
110 HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
111 grpc_pollset* accepting_pollset,
112 grpc_tcp_server_acceptor* acceptor,
113 grpc_channel_args* args);
115 ~HandshakingState() override;
117 void Orphan() override;
119 void Start(grpc_endpoint* endpoint, grpc_channel_args* args);
121 // Needed to be able to grab an external ref in ActiveConnection::Start()
122 using InternallyRefCounted<HandshakingState>::Ref;
125 static void OnTimeout(void* arg, grpc_error_handle error);
126 static void OnReceiveSettings(void* arg, grpc_error_handle /* error */);
127 static void OnHandshakeDone(void* arg, grpc_error_handle error);
128 RefCountedPtr<ActiveConnection> const connection_;
129 grpc_pollset* const accepting_pollset_;
130 grpc_tcp_server_acceptor* const acceptor_;
131 RefCountedPtr<HandshakeManager> handshake_mgr_
132 ABSL_GUARDED_BY(&connection_->mu_);
133 // State for enforcing handshake timeout on receiving HTTP/2 settings.
134 grpc_millis const deadline_;
135 grpc_timer timer_ ABSL_GUARDED_BY(&connection_->mu_);
136 grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_);
137 grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
138 grpc_pollset_set* const interested_parties_;
141 ActiveConnection(grpc_pollset* accepting_pollset,
142 grpc_tcp_server_acceptor* acceptor,
143 grpc_channel_args* args);
144 ~ActiveConnection() override;
146 void Orphan() override;
150 void Start(RefCountedPtr<Chttp2ServerListener> listener,
151 grpc_endpoint* endpoint, grpc_channel_args* args);
153 // Needed to be able to grab an external ref in
154 // Chttp2ServerListener::OnAccept()
155 using InternallyRefCounted<ActiveConnection>::Ref;
158 static void OnClose(void* arg, grpc_error_handle error);
160 RefCountedPtr<Chttp2ServerListener> listener_;
161 Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
162 // Set by HandshakingState before the handshaking begins and reset when
163 // handshaking is done.
164 OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
165 // Set by HandshakingState when handshaking is done and a valid transport is
167 grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
168 grpc_closure on_close_;
169 bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
172 // To allow access to RefCounted<> like interface.
173 friend class RefCountedPtr<Chttp2ServerListener>;
175 // Should only be called once so as to start the TCP server.
176 void StartListening();
178 static void OnAccept(void* arg, grpc_endpoint* tcp,
179 grpc_pollset* accepting_pollset,
180 grpc_tcp_server_acceptor* acceptor);
182 static void TcpServerShutdownComplete(void* arg, grpc_error_handle error);
184 static void DestroyListener(Server* /*server*/, void* arg,
185 grpc_closure* destroy_done);
187 // The interface required by RefCountedPtr<> has been manually implemented
188 // here to take a ref on tcp_server_ instead. Note that, the handshaker needs
189 // tcp_server_ to exist for the lifetime of the handshake since it's needed by
190 // acceptor. Sharing refs between the listener and tcp_server_ is just an
191 // optimization to avoid taking additional refs on the listener, since
192 // TcpServerShutdownComplete already holds a ref to the listener.
193 void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
194 void IncrementRefCount(const DebugLocation& /* location */,
195 const char* /* reason */) {
199 RefCountedPtr<Chttp2ServerListener> Ref() GRPC_MUST_USE_RESULT {
201 return RefCountedPtr<Chttp2ServerListener>(this);
203 RefCountedPtr<Chttp2ServerListener> Ref(const DebugLocation& /* location */,
204 const char* /* reason */)
205 GRPC_MUST_USE_RESULT {
209 void Unref() { grpc_tcp_server_unref(tcp_server_); }
210 void Unref(const DebugLocation& /* location */, const char* /* reason */) {
214 Server* const server_;
215 grpc_tcp_server* tcp_server_;
216 grpc_resolved_address resolved_address_;
217 Chttp2ServerArgsModifier const args_modifier_;
218 ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
219 Mutex channel_args_mu_;
220 grpc_channel_args* args_ ABSL_GUARDED_BY(channel_args_mu_);
221 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
222 connection_manager_ ABSL_GUARDED_BY(channel_args_mu_);
224 // Signals whether grpc_tcp_server_start() has been called.
225 bool started_ ABSL_GUARDED_BY(mu_) = false;
226 // Signals whether grpc_tcp_server_start() has completed.
227 CondVar started_cv_ ABSL_GUARDED_BY(mu_);
228 // Signals whether new requests/connections are to be accepted.
229 bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
230 // Signals whether the application has triggered shutdown.
231 bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
232 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
233 ABSL_GUARDED_BY(mu_);
234 grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
235 grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
236 RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
240 // Chttp2ServerListener::ConfigFetcherWatcher
243 void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
244 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
245 connection_manager) {
246 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
247 connection_manager_to_destroy;
249 MutexLock lock(&listener_->channel_args_mu_);
250 connection_manager_to_destroy = listener_->connection_manager_;
251 listener_->connection_manager_ = std::move(connection_manager);
254 MutexLock lock(&listener_->mu_);
255 if (listener_->shutdown_) {
258 listener_->is_serving_ = true;
259 if (listener_->started_) return;
262 grpc_error_handle error = grpc_tcp_server_add_port(
263 listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
264 if (error != GRPC_ERROR_NONE) {
265 GRPC_ERROR_UNREF(error);
266 gpr_log(GPR_ERROR, "Error adding port to server: %s",
267 grpc_error_std_string(error).c_str());
268 // TODO(yashykt): We wouldn't need to assert here if we bound to the
269 // port earlier during AddPort.
272 listener_->StartListening();
274 MutexLock lock(&listener_->mu_);
275 listener_->started_ = true;
276 listener_->started_cv_.SignalAll();
280 void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
281 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
283 MutexLock lock(&listener_->mu_);
284 listener_->is_serving_ = false;
285 connections = std::move(listener_->connections_);
287 // Send GOAWAYs on the transports so that they disconnected when existing RPCs
289 for (auto& connection : connections) {
290 connection.first->SendGoAway();
295 // Chttp2ServerListener::ActiveConnection::HandshakingState
298 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
300 grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
301 {120 * GPR_MS_PER_SEC, 1, INT_MAX});
302 return ExecCtx::Get()->Now() + timeout_ms;
305 Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
306 RefCountedPtr<ActiveConnection> connection_ref,
307 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
308 grpc_channel_args* args)
309 : connection_(std::move(connection_ref)),
310 accepting_pollset_(accepting_pollset),
312 handshake_mgr_(MakeRefCounted<HandshakeManager>()),
313 deadline_(GetConnectionDeadline(args)),
314 interested_parties_(grpc_pollset_set_create()) {
315 grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
316 HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
317 interested_parties_, handshake_mgr_.get());
320 Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
321 grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
322 grpc_pollset_set_destroy(interested_parties_);
325 void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
327 MutexLock lock(&connection_->mu_);
328 if (handshake_mgr_ != nullptr) {
329 handshake_mgr_->Shutdown(
330 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Listener stopped serving."));
336 void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
337 grpc_endpoint* endpoint, grpc_channel_args* args) {
338 Ref().release(); // Held by OnHandshakeDone
339 RefCountedPtr<HandshakeManager> handshake_mgr;
341 MutexLock lock(&connection_->mu_);
342 if (handshake_mgr_ == nullptr) return;
343 handshake_mgr = handshake_mgr_;
345 handshake_mgr->DoHandshake(endpoint, args, deadline_, acceptor_,
346 OnHandshakeDone, this);
349 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout(
350 void* arg, grpc_error_handle error) {
351 HandshakingState* self = static_cast<HandshakingState*>(arg);
352 // Note that we may be called with GRPC_ERROR_NONE when the timer fires
353 // or with an error indicating that the timer system is being shut down.
354 if (error != GRPC_ERROR_CANCELLED) {
355 grpc_transport_op* op = grpc_make_transport_op(nullptr);
356 op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
357 "Did not receive HTTP/2 settings before handshake timeout");
358 grpc_chttp2_transport* transport = nullptr;
360 MutexLock lock(&self->connection_->mu_);
361 transport = self->connection_->transport_;
363 grpc_transport_perform_op(&transport->base, op);
368 void Chttp2ServerListener::ActiveConnection::HandshakingState::
369 OnReceiveSettings(void* arg, grpc_error_handle /* error */) {
370 HandshakingState* self = static_cast<HandshakingState*>(arg);
371 grpc_timer_cancel(&self->timer_);
375 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
376 void* arg, grpc_error_handle error) {
377 auto* args = static_cast<HandshakerArgs*>(arg);
378 HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
379 OrphanablePtr<HandshakingState> handshaking_state_ref;
380 RefCountedPtr<HandshakeManager> handshake_mgr;
381 bool cleanup_connection = false;
382 bool free_resource_quota = false;
383 grpc_resource_user* resource_user =
384 self->connection_->listener_->server_->default_resource_user();
386 MutexLock connection_lock(&self->connection_->mu_);
387 if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) {
388 std::string error_str = grpc_error_std_string(error);
389 gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str());
390 cleanup_connection = true;
391 free_resource_quota = true;
392 if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
393 // We were shut down or stopped serving after handshaking completed
394 // successfully, so destroy the endpoint here.
395 // TODO(ctiller): It is currently necessary to shutdown endpoints
396 // before destroying them, even if we know that there are no
397 // pending read/write callbacks. This should be fixed, at which
398 // point this can be removed.
399 grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
400 grpc_endpoint_destroy(args->endpoint);
401 grpc_channel_args_destroy(args->args);
402 grpc_slice_buffer_destroy_internal(args->read_buffer);
403 gpr_free(args->read_buffer);
406 // If the handshaking succeeded but there is no endpoint, then the
407 // handshaker may have handed off the connection to some external
408 // code, so we can just clean up here without creating a transport.
409 if (args->endpoint != nullptr) {
410 grpc_transport* transport = grpc_create_chttp2_transport(
411 args->args, args->endpoint, false, resource_user);
412 grpc_error_handle channel_init_err =
413 self->connection_->listener_->server_->SetupTransport(
414 transport, self->accepting_pollset_, args->args,
415 grpc_chttp2_transport_get_socket_node(transport),
417 if (channel_init_err == GRPC_ERROR_NONE) {
418 // Use notify_on_receive_settings callback to enforce the
419 // handshake deadline.
420 // Note: The reinterpret_cast<>s here are safe, because
421 // grpc_chttp2_transport is a C-style extension of
422 // grpc_transport, so this is morally equivalent of a
423 // static_cast<> to a derived class.
424 // TODO(roth): Change to static_cast<> when we C++-ify the
426 self->connection_->transport_ =
427 reinterpret_cast<grpc_chttp2_transport*>(transport);
428 GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
429 "ActiveConnection"); // Held by connection_
430 self->Ref().release(); // Held by OnReceiveSettings().
431 GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
432 self, grpc_schedule_on_exec_ctx);
433 // If the listener has been configured with a config fetcher, we need
434 // to watch on the transport being closed so that we can an updated
435 // list of active connections.
436 grpc_closure* on_close = nullptr;
437 if (self->connection_->listener_->config_fetcher_watcher_ !=
439 // Refs helds by OnClose()
440 self->connection_->Ref().release();
441 on_close = &self->connection_->on_close_;
443 // Remove the connection from the connections_ map since OnClose()
444 // will not be invoked when a config fetcher is set.
445 cleanup_connection = true;
447 grpc_chttp2_transport_start_reading(transport, args->read_buffer,
448 &self->on_receive_settings_,
450 grpc_channel_args_destroy(args->args);
451 self->Ref().release(); // Held by OnTimeout().
452 GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
453 grpc_schedule_on_exec_ctx);
454 grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
456 // Failed to create channel from transport. Clean up.
457 gpr_log(GPR_ERROR, "Failed to create channel: %s",
458 grpc_error_std_string(channel_init_err).c_str());
459 GRPC_ERROR_UNREF(channel_init_err);
460 grpc_transport_destroy(transport);
461 grpc_slice_buffer_destroy_internal(args->read_buffer);
462 gpr_free(args->read_buffer);
463 cleanup_connection = true;
464 free_resource_quota = true;
465 grpc_channel_args_destroy(args->args);
468 cleanup_connection = true;
469 free_resource_quota = true;
472 // Since the handshake manager is done, the connection no longer needs to
473 // shutdown the handshake when the listener needs to stop serving.
474 // Avoid calling the destructor of HandshakeManager and HandshakingState
475 // from within the critical region.
476 handshake_mgr = std::move(self->handshake_mgr_);
477 handshaking_state_ref = std::move(self->connection_->handshaking_state_);
479 gpr_free(self->acceptor_);
480 OrphanablePtr<ActiveConnection> connection;
481 if (free_resource_quota && resource_user != nullptr) {
482 grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
484 if (cleanup_connection) {
485 MutexLock listener_lock(&self->connection_->listener_->mu_);
486 auto it = self->connection_->listener_->connections_.find(
487 self->connection_.get());
488 if (it != self->connection_->listener_->connections_.end()) {
489 connection = std::move(it->second);
490 self->connection_->listener_->connections_.erase(it);
497 // Chttp2ServerListener::ActiveConnection
500 Chttp2ServerListener::ActiveConnection::ActiveConnection(
501 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
502 grpc_channel_args* args)
503 : handshaking_state_(MakeOrphanable<HandshakingState>(
504 Ref(), accepting_pollset, acceptor, args)) {
505 GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
506 grpc_schedule_on_exec_ctx);
509 Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
510 if (transport_ != nullptr) {
511 GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
515 void Chttp2ServerListener::ActiveConnection::Orphan() {
516 OrphanablePtr<HandshakingState> handshaking_state;
518 MutexLock lock(&mu_);
520 // Reset handshaking_state_ since we have been orphaned by the listener
521 // signaling that the listener has stopped serving.
522 handshaking_state = std::move(handshaking_state_);
527 void Chttp2ServerListener::ActiveConnection::SendGoAway() {
528 grpc_chttp2_transport* transport = nullptr;
530 MutexLock lock(&mu_);
531 transport = transport_;
533 if (transport != nullptr) {
534 grpc_transport_op* op = grpc_make_transport_op(nullptr);
535 op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
536 "Server is stopping to serve requests.");
537 grpc_transport_perform_op(&transport->base, op);
541 void Chttp2ServerListener::ActiveConnection::Start(
542 RefCountedPtr<Chttp2ServerListener> listener, grpc_endpoint* endpoint,
543 grpc_channel_args* args) {
544 RefCountedPtr<HandshakingState> handshaking_state_ref;
545 listener_ = std::move(listener);
547 MutexLock lock(&mu_);
548 if (shutdown_) return;
549 // Hold a ref to HandshakingState to allow starting the handshake outside
550 // the critical region.
551 handshaking_state_ref = handshaking_state_->Ref();
553 handshaking_state_ref->Start(endpoint, args);
556 void Chttp2ServerListener::ActiveConnection::OnClose(
557 void* arg, grpc_error_handle /* error */) {
558 ActiveConnection* self = static_cast<ActiveConnection*>(arg);
559 OrphanablePtr<ActiveConnection> connection;
561 MutexLock listener_lock(&self->listener_->mu_);
562 MutexLock connection_lock(&self->mu_);
563 // The node was already deleted from the connections_ list if the connection
565 if (!self->shutdown_) {
566 auto it = self->listener_->connections_.find(self);
567 if (it != self->listener_->connections_.end()) {
568 connection = std::move(it->second);
569 self->listener_->connections_.erase(it);
577 // Chttp2ServerListener
580 grpc_error_handle Chttp2ServerListener::Create(
581 Server* server, grpc_resolved_address* addr, grpc_channel_args* args,
582 Chttp2ServerArgsModifier args_modifier, int* port_num) {
583 Chttp2ServerListener* listener = nullptr;
584 // The bulk of this method is inside of a lambda to make cleanup
585 // easier without using goto.
586 grpc_error_handle error = [&]() {
587 // Create Chttp2ServerListener.
588 listener = new Chttp2ServerListener(server, args, args_modifier);
589 error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
590 args, &listener->tcp_server_);
591 if (error != GRPC_ERROR_NONE) return error;
592 if (server->config_fetcher() != nullptr) {
593 listener->resolved_address_ = *addr;
594 // TODO(yashykt): Consider binding so as to be able to return the port
597 error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
598 if (error != GRPC_ERROR_NONE) return error;
600 // Create channelz node.
601 if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
602 GRPC_ENABLE_CHANNELZ_DEFAULT)) {
603 std::string string_address = grpc_sockaddr_to_uri(addr);
604 listener->channelz_listen_socket_ =
605 MakeRefCounted<channelz::ListenSocketNode>(
606 string_address.c_str(),
607 absl::StrFormat("chttp2 listener %s", string_address.c_str()));
609 // Register with the server only upon success
610 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
611 return GRPC_ERROR_NONE;
613 if (error != GRPC_ERROR_NONE) {
614 if (listener != nullptr) {
615 if (listener->tcp_server_ != nullptr) {
616 // listener is deleted when tcp_server_ is shutdown.
617 grpc_tcp_server_unref(listener->tcp_server_);
622 grpc_channel_args_destroy(args);
628 grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
629 Server* server, const char* name, grpc_channel_args* args,
630 Chttp2ServerArgsModifier args_modifier) {
631 Chttp2ServerListener* listener =
632 new Chttp2ServerListener(server, args, args_modifier);
633 grpc_error_handle error = grpc_tcp_server_create(
634 &listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_);
635 if (error != GRPC_ERROR_NONE) {
639 // TODO(yangg) channelz
640 TcpServerFdHandler** arg_val =
641 grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
642 *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
643 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
644 return GRPC_ERROR_NONE;
647 Chttp2ServerListener::Chttp2ServerListener(
648 Server* server, grpc_channel_args* args,
649 Chttp2ServerArgsModifier args_modifier)
650 : server_(server), args_modifier_(args_modifier), args_(args) {
651 GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
652 this, grpc_schedule_on_exec_ctx);
655 Chttp2ServerListener::~Chttp2ServerListener() {
656 // Flush queued work before destroying handshaker factory, since that
657 // may do a synchronous unref.
658 ExecCtx::Get()->Flush();
659 if (on_destroy_done_ != nullptr) {
660 ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE);
661 ExecCtx::Get()->Flush();
663 grpc_channel_args_destroy(args_);
666 /* Server callback: start listening on our ports */
667 void Chttp2ServerListener::Start(
668 Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
669 if (server_->config_fetcher() != nullptr) {
670 grpc_channel_args* args = nullptr;
671 auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref());
672 config_fetcher_watcher_ = watcher.get();
674 MutexLock lock(&channel_args_mu_);
675 args = grpc_channel_args_copy(args_);
677 server_->config_fetcher()->StartWatch(
678 grpc_sockaddr_to_string(&resolved_address_, false), args,
682 MutexLock lock(&mu_);
690 void Chttp2ServerListener::StartListening() {
691 grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this);
694 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
695 MutexLock lock(&mu_);
696 on_destroy_done_ = on_destroy_done;
699 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
700 grpc_pollset* accepting_pollset,
701 grpc_tcp_server_acceptor* acceptor) {
702 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
703 grpc_channel_args* args = nullptr;
704 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
707 MutexLock lock(&self->channel_args_mu_);
708 args = grpc_channel_args_copy(self->args_);
709 connection_manager = self->connection_manager_;
711 auto endpoint_cleanup = [&](grpc_error_handle error) {
712 grpc_endpoint_shutdown(tcp, error);
713 grpc_endpoint_destroy(tcp);
716 if (self->server_->config_fetcher() != nullptr) {
717 if (connection_manager == nullptr) {
718 grpc_error_handle error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
719 "No ConnectionManager configured. Closing connection.");
720 endpoint_cleanup(error);
721 grpc_channel_args_destroy(args);
724 // TODO(yashykt): Maybe combine the following two arg modifiers into a
726 absl::StatusOr<grpc_channel_args*> args_result =
727 connection_manager->UpdateChannelArgsForConnection(args, tcp);
728 if (!args_result.ok()) {
729 gpr_log(GPR_DEBUG, "Closing connection: %s",
730 args_result.status().ToString().c_str());
731 endpoint_cleanup(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
732 args_result.status().ToString().c_str()));
735 grpc_error_handle error = GRPC_ERROR_NONE;
736 args = self->args_modifier_(*args_result, &error);
737 if (error != GRPC_ERROR_NONE) {
738 gpr_log(GPR_DEBUG, "Closing connection: %s",
739 grpc_error_std_string(error).c_str());
740 endpoint_cleanup(error);
741 grpc_channel_args_destroy(args);
746 MakeOrphanable<ActiveConnection>(accepting_pollset, acceptor, args);
747 // Hold a ref to connection to allow starting handshake outside the
749 RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
750 RefCountedPtr<Chttp2ServerListener> listener_ref;
752 MutexLock lock(&self->mu_);
753 // Shutdown the the connection if listener's stopped serving.
754 if (!self->shutdown_ && self->is_serving_) {
755 grpc_resource_user* resource_user =
756 self->server_->default_resource_user();
757 if (resource_user != nullptr &&
758 !grpc_resource_user_safe_alloc(resource_user,
759 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
762 "Memory quota exhausted, rejecting connection, no handshaking.");
764 // This ref needs to be taken in the critical region after having made
765 // sure that the listener has not been Orphaned, so as to avoid
766 // heap-use-after-free issues where `Ref()` is invoked when the ref of
767 // tcp_server_ has already reached 0. (Ref() implementation of
768 // Chttp2ServerListener is grpc_tcp_server_ref().)
769 listener_ref = self->Ref();
770 self->connections_.emplace(connection.get(), std::move(connection));
774 if (connection != nullptr) {
775 endpoint_cleanup(GRPC_ERROR_NONE);
777 connection_ref->Start(std::move(listener_ref), tcp, args);
779 grpc_channel_args_destroy(args);
782 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
783 grpc_error_handle error) {
784 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
785 self->channelz_listen_socket_.reset();
786 GRPC_ERROR_UNREF(error);
790 /* Server callback: destroy the tcp listener (so we don't generate further
792 void Chttp2ServerListener::Orphan() {
793 // Cancel the watch before shutting down so as to avoid holding a ref to the
794 // listener in the watcher.
795 if (config_fetcher_watcher_ != nullptr) {
796 server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
798 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
799 grpc_tcp_server* tcp_server;
801 MutexLock lock(&mu_);
804 // Orphan the connections so that they can start cleaning up.
805 connections = std::move(connections_);
806 // If the listener is currently set to be serving but has not been started
807 // yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
808 // operation to finish to avoid causing races.
809 while (is_serving_ && !started_) {
810 started_cv_.Wait(&mu_);
812 tcp_server = tcp_server_;
814 grpc_tcp_server_shutdown_listeners(tcp_server);
815 grpc_tcp_server_unref(tcp_server);
821 // Chttp2ServerAddPort()
824 grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
825 grpc_channel_args* args,
826 Chttp2ServerArgsModifier args_modifier,
828 if (strncmp(addr, "external:", 9) == 0) {
829 return grpc_core::Chttp2ServerListener::CreateWithAcceptor(
830 server, addr, args, args_modifier);
833 grpc_resolved_addresses* resolved = nullptr;
834 std::vector<grpc_error_handle> error_list;
835 // Using lambda to avoid use of goto.
836 grpc_error_handle error = [&]() {
837 if (absl::StartsWith(addr, kUnixUriPrefix)) {
838 error = grpc_resolve_unix_domain_address(
839 addr + sizeof(kUnixUriPrefix) - 1, &resolved);
840 } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
841 error = grpc_resolve_unix_abstract_domain_address(
842 addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
844 error = grpc_blocking_resolve_address(addr, "https", &resolved);
846 if (error != GRPC_ERROR_NONE) return error;
847 // Create a listener for each resolved address.
848 for (size_t i = 0; i < resolved->naddrs; i++) {
849 // If address has a wildcard port (0), use the same port as a previous
851 if (*port_num != -1 && grpc_sockaddr_get_port(&resolved->addrs[i]) == 0) {
852 grpc_sockaddr_set_port(&resolved->addrs[i], *port_num);
855 error = grpc_core::Chttp2ServerListener::Create(
856 server, &resolved->addrs[i], grpc_channel_args_copy(args),
857 args_modifier, &port_temp);
858 if (error != GRPC_ERROR_NONE) {
859 error_list.push_back(error);
861 if (*port_num == -1) {
862 *port_num = port_temp;
864 GPR_ASSERT(*port_num == port_temp);
868 if (error_list.size() == resolved->naddrs) {
870 absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
872 return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
873 msg.c_str(), error_list.data(), error_list.size());
874 } else if (!error_list.empty()) {
875 std::string msg = absl::StrFormat(
876 "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
878 resolved->naddrs - error_list.size(), resolved->naddrs);
879 error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
880 msg.c_str(), error_list.data(), error_list.size());
881 gpr_log(GPR_INFO, "WARNING: %s", grpc_error_std_string(error).c_str());
882 GRPC_ERROR_UNREF(error);
883 // we managed to bind some addresses: continue without error
885 return GRPC_ERROR_NONE;
887 for (grpc_error_handle error : error_list) {
888 GRPC_ERROR_UNREF(error);
890 grpc_channel_args_destroy(args);
891 if (resolved != nullptr) {
892 grpc_resolved_addresses_destroy(resolved);
894 if (error != GRPC_ERROR_NONE) *port_num = 0;
898 } // namespace grpc_core