3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
29 #include "absl/strings/match.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_format.h"
33 #include <grpc/grpc.h>
34 #include <grpc/impl/codegen/grpc_types.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/sync.h>
39 #include "src/core/ext/filters/http/server/http_server_filter.h"
40 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
41 #include "src/core/ext/transport/chttp2/transport/internal.h"
42 #include "src/core/lib/address_utils/sockaddr_utils.h"
43 #include "src/core/lib/channel/channel_args.h"
44 #include "src/core/lib/channel/handshaker.h"
45 #include "src/core/lib/config/core_configuration.h"
46 #include "src/core/lib/gprpp/ref_counted.h"
47 #include "src/core/lib/gprpp/ref_counted_ptr.h"
48 #include "src/core/lib/iomgr/endpoint.h"
49 #include "src/core/lib/iomgr/resolve_address.h"
50 #include "src/core/lib/iomgr/resource_quota.h"
51 #include "src/core/lib/iomgr/tcp_server.h"
52 #include "src/core/lib/iomgr/unix_sockets_posix.h"
53 #include "src/core/lib/slice/slice_internal.h"
54 #include "src/core/lib/surface/api_trace.h"
55 #include "src/core/lib/surface/server.h"
60 const char kUnixUriPrefix[] = "unix:";
61 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
63 class Chttp2ServerListener : public Server::ListenerInterface {
65 static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
66 grpc_channel_args* args,
67 Chttp2ServerArgsModifier args_modifier,
70 static grpc_error_handle CreateWithAcceptor(
71 Server* server, const char* name, grpc_channel_args* args,
72 Chttp2ServerArgsModifier args_modifier);
74 // Do not instantiate directly. Use one of the factory methods above.
75 Chttp2ServerListener(Server* server, grpc_channel_args* args,
76 Chttp2ServerArgsModifier args_modifier,
77 grpc_resource_quota* resource_quota);
78 ~Chttp2ServerListener() override;
80 void Start(Server* server,
81 const std::vector<grpc_pollset*>* pollsets) override;
83 channelz::ListenSocketNode* channelz_listen_socket_node() const override {
84 return channelz_listen_socket_.get();
87 void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
89 void Orphan() override;
92 class ConfigFetcherWatcher
93 : public grpc_server_config_fetcher::WatcherInterface {
95 explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
96 : listener_(std::move(listener)) {}
98 void UpdateConnectionManager(
99 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
100 connection_manager) override;
102 void StopServing() override;
105 RefCountedPtr<Chttp2ServerListener> listener_;
108 class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
110 class HandshakingState : public InternallyRefCounted<HandshakingState> {
112 HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
113 grpc_pollset* accepting_pollset,
114 grpc_tcp_server_acceptor* acceptor,
115 grpc_channel_args* args,
116 grpc_resource_user* channel_resource_user);
118 ~HandshakingState() override;
120 void Orphan() override;
122 void Start(grpc_endpoint* endpoint, grpc_channel_args* args);
124 // Needed to be able to grab an external ref in ActiveConnection::Start()
125 using InternallyRefCounted<HandshakingState>::Ref;
128 static void OnTimeout(void* arg, grpc_error_handle error);
129 static void OnReceiveSettings(void* arg, grpc_error_handle /* error */);
130 static void OnHandshakeDone(void* arg, grpc_error_handle error);
131 RefCountedPtr<ActiveConnection> const connection_;
132 grpc_pollset* const accepting_pollset_;
133 grpc_tcp_server_acceptor* acceptor_;
134 RefCountedPtr<HandshakeManager> handshake_mgr_
135 ABSL_GUARDED_BY(&connection_->mu_);
136 // State for enforcing handshake timeout on receiving HTTP/2 settings.
137 grpc_millis const deadline_;
138 grpc_timer timer_ ABSL_GUARDED_BY(&connection_->mu_);
139 grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_);
140 grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
141 grpc_pollset_set* const interested_parties_;
142 grpc_resource_user* channel_resource_user_;
145 ActiveConnection(grpc_pollset* accepting_pollset,
146 grpc_tcp_server_acceptor* acceptor,
147 grpc_channel_args* args,
148 grpc_resource_user* channel_resource_user);
149 ~ActiveConnection() override;
151 void Orphan() override;
155 void Start(RefCountedPtr<Chttp2ServerListener> listener,
156 grpc_endpoint* endpoint, grpc_channel_args* args);
158 // Needed to be able to grab an external ref in
159 // Chttp2ServerListener::OnAccept()
160 using InternallyRefCounted<ActiveConnection>::Ref;
163 static void OnClose(void* arg, grpc_error_handle error);
165 RefCountedPtr<Chttp2ServerListener> listener_;
166 Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
167 // Set by HandshakingState before the handshaking begins and reset when
168 // handshaking is done.
169 OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
170 // Set by HandshakingState when handshaking is done and a valid transport is
172 grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
173 grpc_closure on_close_;
174 bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
177 // To allow access to RefCounted<> like interface.
178 friend class RefCountedPtr<Chttp2ServerListener>;
180 // Should only be called once so as to start the TCP server.
181 void StartListening();
183 static void OnAccept(void* arg, grpc_endpoint* tcp,
184 grpc_pollset* accepting_pollset,
185 grpc_tcp_server_acceptor* acceptor);
187 static void TcpServerShutdownComplete(void* arg, grpc_error_handle error);
189 static void DestroyListener(Server* /*server*/, void* arg,
190 grpc_closure* destroy_done);
192 // The interface required by RefCountedPtr<> has been manually implemented
193 // here to take a ref on tcp_server_ instead. Note that, the handshaker needs
194 // tcp_server_ to exist for the lifetime of the handshake since it's needed by
195 // acceptor. Sharing refs between the listener and tcp_server_ is just an
196 // optimization to avoid taking additional refs on the listener, since
197 // TcpServerShutdownComplete already holds a ref to the listener.
198 void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
199 void IncrementRefCount(const DebugLocation& /* location */,
200 const char* /* reason */) {
204 RefCountedPtr<Chttp2ServerListener> Ref() GRPC_MUST_USE_RESULT {
206 return RefCountedPtr<Chttp2ServerListener>(this);
208 RefCountedPtr<Chttp2ServerListener> Ref(const DebugLocation& /* location */,
209 const char* /* reason */)
210 GRPC_MUST_USE_RESULT {
214 void Unref() { grpc_tcp_server_unref(tcp_server_); }
215 void Unref(const DebugLocation& /* location */, const char* /* reason */) {
219 Server* const server_;
220 grpc_tcp_server* tcp_server_;
221 grpc_resolved_address resolved_address_;
222 Chttp2ServerArgsModifier const args_modifier_;
223 ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
224 Mutex channel_args_mu_;
225 grpc_channel_args* args_ ABSL_GUARDED_BY(channel_args_mu_);
226 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
227 connection_manager_ ABSL_GUARDED_BY(channel_args_mu_);
229 // Signals whether grpc_tcp_server_start() has been called.
230 bool started_ ABSL_GUARDED_BY(mu_) = false;
231 // Signals whether grpc_tcp_server_start() has completed.
232 CondVar started_cv_ ABSL_GUARDED_BY(mu_);
233 // Signals whether new requests/connections are to be accepted.
234 bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
235 // Signals whether the application has triggered shutdown.
236 bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
237 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
238 ABSL_GUARDED_BY(mu_);
239 grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
240 grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
241 RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
242 grpc_resource_quota* resource_quota_;
246 // Chttp2ServerListener::ConfigFetcherWatcher
249 void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
250 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
251 connection_manager) {
252 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
253 connection_manager_to_destroy;
255 MutexLock lock(&listener_->channel_args_mu_);
256 connection_manager_to_destroy = listener_->connection_manager_;
257 listener_->connection_manager_ = std::move(connection_manager);
260 MutexLock lock(&listener_->mu_);
261 if (listener_->shutdown_) {
264 listener_->is_serving_ = true;
265 if (listener_->started_) return;
268 grpc_error_handle error = grpc_tcp_server_add_port(
269 listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
270 if (error != GRPC_ERROR_NONE) {
271 GRPC_ERROR_UNREF(error);
272 gpr_log(GPR_ERROR, "Error adding port to server: %s",
273 grpc_error_std_string(error).c_str());
274 // TODO(yashykt): We wouldn't need to assert here if we bound to the
275 // port earlier during AddPort.
278 listener_->StartListening();
280 MutexLock lock(&listener_->mu_);
281 listener_->started_ = true;
282 listener_->started_cv_.SignalAll();
286 void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
287 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
289 MutexLock lock(&listener_->mu_);
290 listener_->is_serving_ = false;
291 connections = std::move(listener_->connections_);
293 // Send GOAWAYs on the transports so that they disconnected when existing RPCs
295 for (auto& connection : connections) {
296 connection.first->SendGoAway();
301 // Chttp2ServerListener::ActiveConnection::HandshakingState
304 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
306 grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
307 {120 * GPR_MS_PER_SEC, 1, INT_MAX});
308 return ExecCtx::Get()->Now() + timeout_ms;
311 Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
312 RefCountedPtr<ActiveConnection> connection_ref,
313 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
314 grpc_channel_args* args, grpc_resource_user* channel_resource_user)
315 : connection_(std::move(connection_ref)),
316 accepting_pollset_(accepting_pollset),
318 handshake_mgr_(MakeRefCounted<HandshakeManager>()),
319 deadline_(GetConnectionDeadline(args)),
320 interested_parties_(grpc_pollset_set_create()),
321 channel_resource_user_(channel_resource_user) {
322 grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
323 CoreConfiguration::Get().handshaker_registry().AddHandshakers(
324 HANDSHAKER_SERVER, args, interested_parties_, handshake_mgr_.get());
327 Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
328 grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
329 grpc_pollset_set_destroy(interested_parties_);
330 if (channel_resource_user_ != nullptr) {
331 grpc_resource_user_unref(channel_resource_user_);
336 void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
338 MutexLock lock(&connection_->mu_);
339 if (handshake_mgr_ != nullptr) {
340 handshake_mgr_->Shutdown(
341 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Listener stopped serving."));
347 void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
348 grpc_endpoint* endpoint, grpc_channel_args* args) {
349 Ref().release(); // Held by OnHandshakeDone
350 RefCountedPtr<HandshakeManager> handshake_mgr;
352 MutexLock lock(&connection_->mu_);
353 if (handshake_mgr_ == nullptr) return;
354 handshake_mgr = handshake_mgr_;
356 handshake_mgr->DoHandshake(endpoint, args, deadline_, acceptor_,
357 OnHandshakeDone, this);
360 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout(
361 void* arg, grpc_error_handle error) {
362 HandshakingState* self = static_cast<HandshakingState*>(arg);
363 // Note that we may be called with GRPC_ERROR_NONE when the timer fires
364 // or with an error indicating that the timer system is being shut down.
365 if (error != GRPC_ERROR_CANCELLED) {
366 grpc_transport_op* op = grpc_make_transport_op(nullptr);
367 op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
368 "Did not receive HTTP/2 settings before handshake timeout");
369 grpc_chttp2_transport* transport = nullptr;
371 MutexLock lock(&self->connection_->mu_);
372 transport = self->connection_->transport_;
374 grpc_transport_perform_op(&transport->base, op);
379 void Chttp2ServerListener::ActiveConnection::HandshakingState::
380 OnReceiveSettings(void* arg, grpc_error_handle /* error */) {
381 HandshakingState* self = static_cast<HandshakingState*>(arg);
382 grpc_timer_cancel(&self->timer_);
386 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
387 void* arg, grpc_error_handle error) {
388 auto* args = static_cast<HandshakerArgs*>(arg);
389 HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
390 OrphanablePtr<HandshakingState> handshaking_state_ref;
391 RefCountedPtr<HandshakeManager> handshake_mgr;
392 bool cleanup_connection = false;
394 MutexLock connection_lock(&self->connection_->mu_);
395 if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) {
396 std::string error_str = grpc_error_std_string(error);
397 gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str());
398 cleanup_connection = true;
399 if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
400 // We were shut down or stopped serving after handshaking completed
401 // successfully, so destroy the endpoint here.
402 // TODO(ctiller): It is currently necessary to shutdown endpoints
403 // before destroying them, even if we know that there are no
404 // pending read/write callbacks. This should be fixed, at which
405 // point this can be removed.
406 grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
407 grpc_endpoint_destroy(args->endpoint);
408 grpc_channel_args_destroy(args->args);
409 grpc_slice_buffer_destroy_internal(args->read_buffer);
410 gpr_free(args->read_buffer);
413 // If the handshaking succeeded but there is no endpoint, then the
414 // handshaker may have handed off the connection to some external
415 // code, so we can just clean up here without creating a transport.
416 if (args->endpoint != nullptr) {
417 grpc_transport* transport = grpc_create_chttp2_transport(
418 args->args, args->endpoint, false,
419 grpc_resource_user_create(
420 self->connection_->listener_->resource_quota_,
421 absl::StrCat(grpc_endpoint_get_peer(args->endpoint),
422 ":chttp2_server_transport")));
423 grpc_error_handle channel_init_err =
424 self->connection_->listener_->server_->SetupTransport(
425 transport, self->accepting_pollset_, args->args,
426 grpc_chttp2_transport_get_socket_node(transport),
427 self->channel_resource_user_, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
428 self->channel_resource_user_ = nullptr;
429 if (channel_init_err == GRPC_ERROR_NONE) {
430 // Use notify_on_receive_settings callback to enforce the
431 // handshake deadline.
432 // Note: The reinterpret_cast<>s here are safe, because
433 // grpc_chttp2_transport is a C-style extension of
434 // grpc_transport, so this is morally equivalent of a
435 // static_cast<> to a derived class.
436 // TODO(roth): Change to static_cast<> when we C++-ify the
438 self->connection_->transport_ =
439 reinterpret_cast<grpc_chttp2_transport*>(transport);
440 GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
441 "ActiveConnection"); // Held by connection_
442 self->Ref().release(); // Held by OnReceiveSettings().
443 GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
444 self, grpc_schedule_on_exec_ctx);
445 // If the listener has been configured with a config fetcher, we need
446 // to watch on the transport being closed so that we can an updated
447 // list of active connections.
448 grpc_closure* on_close = nullptr;
449 if (self->connection_->listener_->config_fetcher_watcher_ !=
451 // Refs helds by OnClose()
452 self->connection_->Ref().release();
453 on_close = &self->connection_->on_close_;
455 // Remove the connection from the connections_ map since OnClose()
456 // will not be invoked when a config fetcher is set.
457 cleanup_connection = true;
459 grpc_chttp2_transport_start_reading(transport, args->read_buffer,
460 &self->on_receive_settings_,
462 grpc_channel_args_destroy(args->args);
463 self->Ref().release(); // Held by OnTimeout().
464 GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
465 grpc_schedule_on_exec_ctx);
466 grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
468 // Failed to create channel from transport. Clean up.
469 gpr_log(GPR_ERROR, "Failed to create channel: %s",
470 grpc_error_std_string(channel_init_err).c_str());
471 GRPC_ERROR_UNREF(channel_init_err);
472 grpc_transport_destroy(transport);
473 grpc_slice_buffer_destroy_internal(args->read_buffer);
474 gpr_free(args->read_buffer);
475 cleanup_connection = true;
476 grpc_channel_args_destroy(args->args);
479 cleanup_connection = true;
482 // Since the handshake manager is done, the connection no longer needs to
483 // shutdown the handshake when the listener needs to stop serving.
484 // Avoid calling the destructor of HandshakeManager and HandshakingState
485 // from within the critical region.
486 handshake_mgr = std::move(self->handshake_mgr_);
487 handshaking_state_ref = std::move(self->connection_->handshaking_state_);
489 gpr_free(self->acceptor_);
490 self->acceptor_ = nullptr;
491 OrphanablePtr<ActiveConnection> connection;
492 if (self->channel_resource_user_ != nullptr) {
493 grpc_resource_user_free(self->channel_resource_user_,
494 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
496 if (cleanup_connection) {
497 MutexLock listener_lock(&self->connection_->listener_->mu_);
498 auto it = self->connection_->listener_->connections_.find(
499 self->connection_.get());
500 if (it != self->connection_->listener_->connections_.end()) {
501 connection = std::move(it->second);
502 self->connection_->listener_->connections_.erase(it);
509 // Chttp2ServerListener::ActiveConnection
512 Chttp2ServerListener::ActiveConnection::ActiveConnection(
513 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
514 grpc_channel_args* args, grpc_resource_user* channel_resource_user)
515 : handshaking_state_(MakeOrphanable<HandshakingState>(
516 Ref(), accepting_pollset, acceptor, args, channel_resource_user)) {
517 GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
518 grpc_schedule_on_exec_ctx);
521 Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
522 if (transport_ != nullptr) {
523 GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
527 void Chttp2ServerListener::ActiveConnection::Orphan() {
528 OrphanablePtr<HandshakingState> handshaking_state;
530 MutexLock lock(&mu_);
532 // Reset handshaking_state_ since we have been orphaned by the listener
533 // signaling that the listener has stopped serving.
534 handshaking_state = std::move(handshaking_state_);
539 void Chttp2ServerListener::ActiveConnection::SendGoAway() {
540 grpc_chttp2_transport* transport = nullptr;
542 MutexLock lock(&mu_);
543 transport = transport_;
545 if (transport != nullptr) {
546 grpc_transport_op* op = grpc_make_transport_op(nullptr);
547 op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
548 "Server is stopping to serve requests.");
549 grpc_transport_perform_op(&transport->base, op);
553 void Chttp2ServerListener::ActiveConnection::Start(
554 RefCountedPtr<Chttp2ServerListener> listener, grpc_endpoint* endpoint,
555 grpc_channel_args* args) {
556 RefCountedPtr<HandshakingState> handshaking_state_ref;
557 listener_ = std::move(listener);
559 MutexLock lock(&mu_);
560 if (shutdown_) return;
561 // Hold a ref to HandshakingState to allow starting the handshake outside
562 // the critical region.
563 handshaking_state_ref = handshaking_state_->Ref();
565 handshaking_state_ref->Start(endpoint, args);
568 void Chttp2ServerListener::ActiveConnection::OnClose(
569 void* arg, grpc_error_handle /* error */) {
570 ActiveConnection* self = static_cast<ActiveConnection*>(arg);
571 OrphanablePtr<ActiveConnection> connection;
573 MutexLock listener_lock(&self->listener_->mu_);
574 MutexLock connection_lock(&self->mu_);
575 // The node was already deleted from the connections_ list if the connection
577 if (!self->shutdown_) {
578 auto it = self->listener_->connections_.find(self);
579 if (it != self->listener_->connections_.end()) {
580 connection = std::move(it->second);
581 self->listener_->connections_.erase(it);
589 // Chttp2ServerListener
592 grpc_error_handle Chttp2ServerListener::Create(
593 Server* server, grpc_resolved_address* addr, grpc_channel_args* args,
594 Chttp2ServerArgsModifier args_modifier, int* port_num) {
595 Chttp2ServerListener* listener = nullptr;
596 // The bulk of this method is inside of a lambda to make cleanup
597 // easier without using goto.
598 grpc_error_handle error = [&]() {
599 // Create Chttp2ServerListener.
600 listener = new Chttp2ServerListener(
601 server, args, args_modifier,
602 grpc_resource_quota_from_channel_args(args, true));
603 grpc_resource_quota_ref_internal(listener->resource_quota_);
604 error = grpc_tcp_server_create(
605 &listener->tcp_server_shutdown_complete_, args,
606 grpc_slice_allocator_factory_create(listener->resource_quota_),
607 &listener->tcp_server_);
608 if (error != GRPC_ERROR_NONE) return error;
609 if (server->config_fetcher() != nullptr) {
610 listener->resolved_address_ = *addr;
611 // TODO(yashykt): Consider binding so as to be able to return the port
614 error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
615 if (error != GRPC_ERROR_NONE) return error;
617 // Create channelz node.
618 if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
619 GRPC_ENABLE_CHANNELZ_DEFAULT)) {
620 std::string string_address = grpc_sockaddr_to_uri(addr);
621 listener->channelz_listen_socket_ =
622 MakeRefCounted<channelz::ListenSocketNode>(
623 string_address.c_str(),
624 absl::StrFormat("chttp2 listener %s", string_address.c_str()));
626 // Register with the server only upon success
627 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
628 return GRPC_ERROR_NONE;
630 if (error != GRPC_ERROR_NONE) {
631 if (listener != nullptr) {
632 if (listener->tcp_server_ != nullptr) {
633 // listener is deleted when tcp_server_ is shutdown.
634 grpc_tcp_server_unref(listener->tcp_server_);
639 grpc_channel_args_destroy(args);
645 grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
646 Server* server, const char* name, grpc_channel_args* args,
647 Chttp2ServerArgsModifier args_modifier) {
648 Chttp2ServerListener* listener = new Chttp2ServerListener(
649 server, args, args_modifier,
650 grpc_resource_quota_from_channel_args(args, true));
651 grpc_resource_quota_ref_internal(listener->resource_quota_);
652 grpc_error_handle error = grpc_tcp_server_create(
653 &listener->tcp_server_shutdown_complete_, args,
654 grpc_slice_allocator_factory_create(listener->resource_quota_),
655 &listener->tcp_server_);
656 if (error != GRPC_ERROR_NONE) {
660 // TODO(yangg) channelz
661 TcpServerFdHandler** arg_val =
662 grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
663 *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
664 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
665 return GRPC_ERROR_NONE;
668 Chttp2ServerListener::Chttp2ServerListener(
669 Server* server, grpc_channel_args* args,
670 Chttp2ServerArgsModifier args_modifier, grpc_resource_quota* resource_quota)
672 args_modifier_(args_modifier),
674 resource_quota_(resource_quota) {
675 GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
676 this, grpc_schedule_on_exec_ctx);
679 Chttp2ServerListener::~Chttp2ServerListener() {
680 // Flush queued work before destroying handshaker factory, since that
681 // may do a synchronous unref.
682 ExecCtx::Get()->Flush();
683 if (on_destroy_done_ != nullptr) {
684 ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE);
685 ExecCtx::Get()->Flush();
687 grpc_resource_quota_unref_internal(resource_quota_);
688 grpc_channel_args_destroy(args_);
691 /* Server callback: start listening on our ports */
692 void Chttp2ServerListener::Start(
693 Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
694 if (server_->config_fetcher() != nullptr) {
695 grpc_channel_args* args = nullptr;
696 auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref());
697 config_fetcher_watcher_ = watcher.get();
699 MutexLock lock(&channel_args_mu_);
700 args = grpc_channel_args_copy(args_);
702 server_->config_fetcher()->StartWatch(
703 grpc_sockaddr_to_string(&resolved_address_, false), args,
707 MutexLock lock(&mu_);
715 void Chttp2ServerListener::StartListening() {
716 grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this);
719 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
720 MutexLock lock(&mu_);
721 on_destroy_done_ = on_destroy_done;
724 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
725 grpc_pollset* accepting_pollset,
726 grpc_tcp_server_acceptor* acceptor) {
727 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
728 grpc_channel_args* args = nullptr;
729 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
732 MutexLock lock(&self->channel_args_mu_);
733 args = grpc_channel_args_copy(self->args_);
734 connection_manager = self->connection_manager_;
736 auto endpoint_cleanup = [&](grpc_error_handle error) {
737 grpc_endpoint_shutdown(tcp, error);
738 grpc_endpoint_destroy(tcp);
741 if (self->server_->config_fetcher() != nullptr) {
742 if (connection_manager == nullptr) {
743 grpc_error_handle error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
744 "No ConnectionManager configured. Closing connection.");
745 endpoint_cleanup(error);
746 grpc_channel_args_destroy(args);
749 // TODO(yashykt): Maybe combine the following two arg modifiers into a
751 absl::StatusOr<grpc_channel_args*> args_result =
752 connection_manager->UpdateChannelArgsForConnection(args, tcp);
753 if (!args_result.ok()) {
754 gpr_log(GPR_DEBUG, "Closing connection: %s",
755 args_result.status().ToString().c_str());
757 GRPC_ERROR_CREATE_FROM_CPP_STRING(args_result.status().ToString()));
760 grpc_error_handle error = GRPC_ERROR_NONE;
761 args = self->args_modifier_(*args_result, &error);
762 if (error != GRPC_ERROR_NONE) {
763 gpr_log(GPR_DEBUG, "Closing connection: %s",
764 grpc_error_std_string(error).c_str());
765 endpoint_cleanup(error);
766 grpc_channel_args_destroy(args);
770 grpc_resource_user* channel_resource_user = grpc_resource_user_create(
771 self->resource_quota_,
772 absl::StrCat(grpc_endpoint_get_peer(tcp), ":server_channel"));
773 auto connection = MakeOrphanable<ActiveConnection>(
774 accepting_pollset, acceptor, args, channel_resource_user);
775 // We no longer own acceptor
777 // Hold a ref to connection to allow starting handshake outside the
779 RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
780 RefCountedPtr<Chttp2ServerListener> listener_ref;
782 MutexLock lock(&self->mu_);
783 // Shutdown the the connection if listener's stopped serving.
784 if (!self->shutdown_ && self->is_serving_) {
785 if (!grpc_resource_user_safe_alloc(channel_resource_user,
786 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
789 "Memory quota exhausted, rejecting connection, no handshaking.");
791 // This ref needs to be taken in the critical region after having made
792 // sure that the listener has not been Orphaned, so as to avoid
793 // heap-use-after-free issues where `Ref()` is invoked when the ref of
794 // tcp_server_ has already reached 0. (Ref() implementation of
795 // Chttp2ServerListener is grpc_tcp_server_ref().)
796 listener_ref = self->Ref();
797 self->connections_.emplace(connection.get(), std::move(connection));
801 if (connection != nullptr) {
802 endpoint_cleanup(GRPC_ERROR_NONE);
804 connection_ref->Start(std::move(listener_ref), tcp, args);
806 grpc_channel_args_destroy(args);
809 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
810 grpc_error_handle error) {
811 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
812 self->channelz_listen_socket_.reset();
813 GRPC_ERROR_UNREF(error);
817 /* Server callback: destroy the tcp listener (so we don't generate further
819 void Chttp2ServerListener::Orphan() {
820 // Cancel the watch before shutting down so as to avoid holding a ref to the
821 // listener in the watcher.
822 if (config_fetcher_watcher_ != nullptr) {
823 server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
825 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
826 grpc_tcp_server* tcp_server;
828 MutexLock lock(&mu_);
831 // Orphan the connections so that they can start cleaning up.
832 connections = std::move(connections_);
833 // If the listener is currently set to be serving but has not been started
834 // yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
835 // operation to finish to avoid causing races.
836 while (is_serving_ && !started_) {
837 started_cv_.Wait(&mu_);
839 tcp_server = tcp_server_;
841 grpc_tcp_server_shutdown_listeners(tcp_server);
842 grpc_tcp_server_unref(tcp_server);
848 // Chttp2ServerAddPort()
851 grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
852 grpc_channel_args* args,
853 Chttp2ServerArgsModifier args_modifier,
855 if (strncmp(addr, "external:", 9) == 0) {
856 return grpc_core::Chttp2ServerListener::CreateWithAcceptor(
857 server, addr, args, args_modifier);
860 grpc_resolved_addresses* resolved = nullptr;
861 std::vector<grpc_error_handle> error_list;
862 // Using lambda to avoid use of goto.
863 grpc_error_handle error = [&]() {
864 if (absl::StartsWith(addr, kUnixUriPrefix)) {
865 error = grpc_resolve_unix_domain_address(
866 addr + sizeof(kUnixUriPrefix) - 1, &resolved);
867 } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
868 error = grpc_resolve_unix_abstract_domain_address(
869 addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
871 error = grpc_blocking_resolve_address(addr, "https", &resolved);
873 if (error != GRPC_ERROR_NONE) return error;
874 // Create a listener for each resolved address.
875 for (size_t i = 0; i < resolved->naddrs; i++) {
876 // If address has a wildcard port (0), use the same port as a previous
878 if (*port_num != -1 && grpc_sockaddr_get_port(&resolved->addrs[i]) == 0) {
879 grpc_sockaddr_set_port(&resolved->addrs[i], *port_num);
882 error = grpc_core::Chttp2ServerListener::Create(
883 server, &resolved->addrs[i], grpc_channel_args_copy(args),
884 args_modifier, &port_temp);
885 if (error != GRPC_ERROR_NONE) {
886 error_list.push_back(error);
888 if (*port_num == -1) {
889 *port_num = port_temp;
891 GPR_ASSERT(*port_num == port_temp);
895 if (error_list.size() == resolved->naddrs) {
897 absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
899 return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
900 msg.c_str(), error_list.data(), error_list.size());
901 } else if (!error_list.empty()) {
902 std::string msg = absl::StrFormat(
903 "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
905 resolved->naddrs - error_list.size(), resolved->naddrs);
906 error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
907 msg.c_str(), error_list.data(), error_list.size());
908 gpr_log(GPR_INFO, "WARNING: %s", grpc_error_std_string(error).c_str());
909 GRPC_ERROR_UNREF(error);
910 // we managed to bind some addresses: continue without error
912 return GRPC_ERROR_NONE;
914 for (grpc_error_handle error : error_list) {
915 GRPC_ERROR_UNREF(error);
917 grpc_channel_args_destroy(args);
918 if (resolved != nullptr) {
919 grpc_resolved_addresses_destroy(resolved);
921 if (error != GRPC_ERROR_NONE) *port_num = 0;
925 } // namespace grpc_core