3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
28 #include "absl/strings/match.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
32 #include <grpc/grpc.h>
33 #include <grpc/impl/codegen/grpc_types.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/sync.h>
38 #include "src/core/ext/filters/http/server/http_server_filter.h"
39 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
40 #include "src/core/ext/transport/chttp2/transport/internal.h"
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/handshaker.h"
44 #include "src/core/lib/channel/handshaker_registry.h"
45 #include "src/core/lib/gprpp/ref_counted.h"
46 #include "src/core/lib/gprpp/ref_counted_ptr.h"
47 #include "src/core/lib/iomgr/endpoint.h"
48 #include "src/core/lib/iomgr/resolve_address.h"
49 #include "src/core/lib/iomgr/resource_quota.h"
50 #include "src/core/lib/iomgr/tcp_server.h"
51 #include "src/core/lib/iomgr/unix_sockets_posix.h"
52 #include "src/core/lib/slice/slice_internal.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/server.h"
59 const char kUnixUriPrefix[] = "unix:";
60 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
62 class Chttp2ServerListener : public Server::ListenerInterface {
64 static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
65 grpc_channel_args* args,
66 Chttp2ServerArgsModifier args_modifier,
69 static grpc_error_handle CreateWithAcceptor(
70 Server* server, const char* name, grpc_channel_args* args,
71 Chttp2ServerArgsModifier args_modifier);
73 // Do not instantiate directly. Use one of the factory methods above.
74 Chttp2ServerListener(Server* server, grpc_channel_args* args,
75 Chttp2ServerArgsModifier args_modifier);
76 ~Chttp2ServerListener() override;
78 void Start(Server* server,
79 const std::vector<grpc_pollset*>* pollsets) override;
81 channelz::ListenSocketNode* channelz_listen_socket_node() const override {
82 return channelz_listen_socket_.get();
85 void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
87 void Orphan() override;
90 class ConfigFetcherWatcher
91 : public grpc_server_config_fetcher::WatcherInterface {
93 explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
94 : listener_(std::move(listener)) {}
96 void UpdateConnectionManager(
97 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
98 connection_manager) override;
100 void StopServing() override;
103 RefCountedPtr<Chttp2ServerListener> listener_;
106 class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
108 class HandshakingState : public InternallyRefCounted<HandshakingState> {
110 HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
111 grpc_pollset* accepting_pollset,
112 grpc_tcp_server_acceptor* acceptor,
113 grpc_channel_args* args);
115 ~HandshakingState() override;
117 void Orphan() override;
119 void Start(grpc_endpoint* endpoint, grpc_channel_args* args);
121 // Needed to be able to grab an external ref in ActiveConnection::Start()
122 using InternallyRefCounted<HandshakingState>::Ref;
125 static void OnTimeout(void* arg, grpc_error_handle error);
126 static void OnReceiveSettings(void* arg, grpc_error_handle /* error */);
127 static void OnHandshakeDone(void* arg, grpc_error_handle error);
128 RefCountedPtr<ActiveConnection> const connection_;
129 grpc_pollset* const accepting_pollset_;
130 grpc_tcp_server_acceptor* acceptor_;
131 RefCountedPtr<HandshakeManager> handshake_mgr_
132 ABSL_GUARDED_BY(&connection_->mu_);
133 // State for enforcing handshake timeout on receiving HTTP/2 settings.
134 grpc_millis const deadline_;
135 grpc_timer timer_ ABSL_GUARDED_BY(&connection_->mu_);
136 grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_);
137 grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
138 grpc_pollset_set* const interested_parties_;
141 ActiveConnection(grpc_pollset* accepting_pollset,
142 grpc_tcp_server_acceptor* acceptor,
143 grpc_channel_args* args);
144 ~ActiveConnection() override;
146 void Orphan() override;
150 void Start(RefCountedPtr<Chttp2ServerListener> listener,
151 grpc_endpoint* endpoint, grpc_channel_args* args);
153 // Needed to be able to grab an external ref in
154 // Chttp2ServerListener::OnAccept()
155 using InternallyRefCounted<ActiveConnection>::Ref;
158 static void OnClose(void* arg, grpc_error_handle error);
160 RefCountedPtr<Chttp2ServerListener> listener_;
161 Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
162 // Set by HandshakingState before the handshaking begins and reset when
163 // handshaking is done.
164 OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
165 // Set by HandshakingState when handshaking is done and a valid transport is
167 grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
168 grpc_closure on_close_;
169 bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
172 // To allow access to RefCounted<> like interface.
173 friend class RefCountedPtr<Chttp2ServerListener>;
175 // Should only be called once so as to start the TCP server.
176 void StartListening();
178 static void OnAccept(void* arg, grpc_endpoint* tcp,
179 grpc_pollset* accepting_pollset,
180 grpc_tcp_server_acceptor* acceptor);
182 static void TcpServerShutdownComplete(void* arg, grpc_error_handle error);
184 static void DestroyListener(Server* /*server*/, void* arg,
185 grpc_closure* destroy_done);
187 // The interface required by RefCountedPtr<> has been manually implemented
188 // here to take a ref on tcp_server_ instead. Note that, the handshaker needs
189 // tcp_server_ to exist for the lifetime of the handshake since it's needed by
190 // acceptor. Sharing refs between the listener and tcp_server_ is just an
191 // optimization to avoid taking additional refs on the listener, since
192 // TcpServerShutdownComplete already holds a ref to the listener.
193 void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
194 void IncrementRefCount(const DebugLocation& /* location */,
195 const char* /* reason */) {
199 RefCountedPtr<Chttp2ServerListener> Ref() GRPC_MUST_USE_RESULT {
201 return RefCountedPtr<Chttp2ServerListener>(this);
203 RefCountedPtr<Chttp2ServerListener> Ref(const DebugLocation& /* location */,
204 const char* /* reason */)
205 GRPC_MUST_USE_RESULT {
209 void Unref() { grpc_tcp_server_unref(tcp_server_); }
210 void Unref(const DebugLocation& /* location */, const char* /* reason */) {
214 Server* const server_;
215 grpc_tcp_server* tcp_server_;
216 grpc_resolved_address resolved_address_;
217 Chttp2ServerArgsModifier const args_modifier_;
218 ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
219 Mutex channel_args_mu_;
220 grpc_channel_args* args_ ABSL_GUARDED_BY(channel_args_mu_);
221 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
222 connection_manager_ ABSL_GUARDED_BY(channel_args_mu_);
224 // Signals whether grpc_tcp_server_start() has been called.
225 bool started_ ABSL_GUARDED_BY(mu_) = false;
226 // Signals whether grpc_tcp_server_start() has completed.
227 CondVar started_cv_ ABSL_GUARDED_BY(mu_);
228 // Signals whether new requests/connections are to be accepted.
229 bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
230 // Signals whether the application has triggered shutdown.
231 bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
232 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
233 ABSL_GUARDED_BY(mu_);
234 grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
235 grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
236 RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
240 // Chttp2ServerListener::ConfigFetcherWatcher
243 void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
244 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
245 connection_manager) {
246 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
247 connection_manager_to_destroy;
249 MutexLock lock(&listener_->channel_args_mu_);
250 connection_manager_to_destroy = listener_->connection_manager_;
251 listener_->connection_manager_ = std::move(connection_manager);
254 MutexLock lock(&listener_->mu_);
255 if (listener_->shutdown_) {
258 listener_->is_serving_ = true;
259 if (listener_->started_) return;
262 grpc_error_handle error = grpc_tcp_server_add_port(
263 listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
264 if (error != GRPC_ERROR_NONE) {
265 GRPC_ERROR_UNREF(error);
266 gpr_log(GPR_ERROR, "Error adding port to server: %s",
267 grpc_error_std_string(error).c_str());
268 // TODO(yashykt): We wouldn't need to assert here if we bound to the
269 // port earlier during AddPort.
272 listener_->StartListening();
274 MutexLock lock(&listener_->mu_);
275 listener_->started_ = true;
276 listener_->started_cv_.SignalAll();
280 void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
281 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
283 MutexLock lock(&listener_->mu_);
284 listener_->is_serving_ = false;
285 connections = std::move(listener_->connections_);
287 // Send GOAWAYs on the transports so that they disconnected when existing RPCs
289 for (auto& connection : connections) {
290 connection.first->SendGoAway();
295 // Chttp2ServerListener::ActiveConnection::HandshakingState
298 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
300 grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
301 {120 * GPR_MS_PER_SEC, 1, INT_MAX});
302 return ExecCtx::Get()->Now() + timeout_ms;
305 Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
306 RefCountedPtr<ActiveConnection> connection_ref,
307 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
308 grpc_channel_args* args)
309 : connection_(std::move(connection_ref)),
310 accepting_pollset_(accepting_pollset),
312 handshake_mgr_(MakeRefCounted<HandshakeManager>()),
313 deadline_(GetConnectionDeadline(args)),
314 interested_parties_(grpc_pollset_set_create()) {
315 grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
316 HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
317 interested_parties_, handshake_mgr_.get());
320 Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
321 grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
322 grpc_pollset_set_destroy(interested_parties_);
326 void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
328 MutexLock lock(&connection_->mu_);
329 if (handshake_mgr_ != nullptr) {
330 handshake_mgr_->Shutdown(
331 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Listener stopped serving."));
337 void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
338 grpc_endpoint* endpoint, grpc_channel_args* args) {
339 Ref().release(); // Held by OnHandshakeDone
340 RefCountedPtr<HandshakeManager> handshake_mgr;
342 MutexLock lock(&connection_->mu_);
343 if (handshake_mgr_ == nullptr) return;
344 handshake_mgr = handshake_mgr_;
346 handshake_mgr->DoHandshake(endpoint, args, deadline_, acceptor_,
347 OnHandshakeDone, this);
350 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout(
351 void* arg, grpc_error_handle error) {
352 HandshakingState* self = static_cast<HandshakingState*>(arg);
353 // Note that we may be called with GRPC_ERROR_NONE when the timer fires
354 // or with an error indicating that the timer system is being shut down.
355 if (error != GRPC_ERROR_CANCELLED) {
356 grpc_transport_op* op = grpc_make_transport_op(nullptr);
357 op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
358 "Did not receive HTTP/2 settings before handshake timeout");
359 grpc_chttp2_transport* transport = nullptr;
361 MutexLock lock(&self->connection_->mu_);
362 transport = self->connection_->transport_;
364 grpc_transport_perform_op(&transport->base, op);
369 void Chttp2ServerListener::ActiveConnection::HandshakingState::
370 OnReceiveSettings(void* arg, grpc_error_handle /* error */) {
371 HandshakingState* self = static_cast<HandshakingState*>(arg);
372 grpc_timer_cancel(&self->timer_);
376 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
377 void* arg, grpc_error_handle error) {
378 auto* args = static_cast<HandshakerArgs*>(arg);
379 HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
380 OrphanablePtr<HandshakingState> handshaking_state_ref;
381 RefCountedPtr<HandshakeManager> handshake_mgr;
382 bool cleanup_connection = false;
383 bool free_resource_quota = false;
384 grpc_resource_user* resource_user =
385 self->connection_->listener_->server_->default_resource_user();
387 MutexLock connection_lock(&self->connection_->mu_);
388 if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) {
389 std::string error_str = grpc_error_std_string(error);
390 gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str());
391 cleanup_connection = true;
392 free_resource_quota = true;
393 if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
394 // We were shut down or stopped serving after handshaking completed
395 // successfully, so destroy the endpoint here.
396 // TODO(ctiller): It is currently necessary to shutdown endpoints
397 // before destroying them, even if we know that there are no
398 // pending read/write callbacks. This should be fixed, at which
399 // point this can be removed.
400 grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
401 grpc_endpoint_destroy(args->endpoint);
402 grpc_channel_args_destroy(args->args);
403 grpc_slice_buffer_destroy_internal(args->read_buffer);
404 gpr_free(args->read_buffer);
407 // If the handshaking succeeded but there is no endpoint, then the
408 // handshaker may have handed off the connection to some external
409 // code, so we can just clean up here without creating a transport.
410 if (args->endpoint != nullptr) {
411 grpc_transport* transport = grpc_create_chttp2_transport(
412 args->args, args->endpoint, false, resource_user);
413 grpc_error_handle channel_init_err =
414 self->connection_->listener_->server_->SetupTransport(
415 transport, self->accepting_pollset_, args->args,
416 grpc_chttp2_transport_get_socket_node(transport),
418 if (channel_init_err == GRPC_ERROR_NONE) {
419 // Use notify_on_receive_settings callback to enforce the
420 // handshake deadline.
421 // Note: The reinterpret_cast<>s here are safe, because
422 // grpc_chttp2_transport is a C-style extension of
423 // grpc_transport, so this is morally equivalent of a
424 // static_cast<> to a derived class.
425 // TODO(roth): Change to static_cast<> when we C++-ify the
427 self->connection_->transport_ =
428 reinterpret_cast<grpc_chttp2_transport*>(transport);
429 GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
430 "ActiveConnection"); // Held by connection_
431 self->Ref().release(); // Held by OnReceiveSettings().
432 GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
433 self, grpc_schedule_on_exec_ctx);
434 // If the listener has been configured with a config fetcher, we need
435 // to watch on the transport being closed so that we can an updated
436 // list of active connections.
437 grpc_closure* on_close = nullptr;
438 if (self->connection_->listener_->config_fetcher_watcher_ !=
440 // Refs helds by OnClose()
441 self->connection_->Ref().release();
442 on_close = &self->connection_->on_close_;
444 // Remove the connection from the connections_ map since OnClose()
445 // will not be invoked when a config fetcher is set.
446 cleanup_connection = true;
448 grpc_chttp2_transport_start_reading(transport, args->read_buffer,
449 &self->on_receive_settings_,
451 grpc_channel_args_destroy(args->args);
452 self->Ref().release(); // Held by OnTimeout().
453 GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
454 grpc_schedule_on_exec_ctx);
455 grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
457 // Failed to create channel from transport. Clean up.
458 gpr_log(GPR_ERROR, "Failed to create channel: %s",
459 grpc_error_std_string(channel_init_err).c_str());
460 GRPC_ERROR_UNREF(channel_init_err);
461 grpc_transport_destroy(transport);
462 grpc_slice_buffer_destroy_internal(args->read_buffer);
463 gpr_free(args->read_buffer);
464 cleanup_connection = true;
465 free_resource_quota = true;
466 grpc_channel_args_destroy(args->args);
469 cleanup_connection = true;
470 free_resource_quota = true;
473 // Since the handshake manager is done, the connection no longer needs to
474 // shutdown the handshake when the listener needs to stop serving.
475 // Avoid calling the destructor of HandshakeManager and HandshakingState
476 // from within the critical region.
477 handshake_mgr = std::move(self->handshake_mgr_);
478 handshaking_state_ref = std::move(self->connection_->handshaking_state_);
480 gpr_free(self->acceptor_);
481 self->acceptor_ = nullptr;
482 OrphanablePtr<ActiveConnection> connection;
483 if (free_resource_quota && resource_user != nullptr) {
484 grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
486 if (cleanup_connection) {
487 MutexLock listener_lock(&self->connection_->listener_->mu_);
488 auto it = self->connection_->listener_->connections_.find(
489 self->connection_.get());
490 if (it != self->connection_->listener_->connections_.end()) {
491 connection = std::move(it->second);
492 self->connection_->listener_->connections_.erase(it);
499 // Chttp2ServerListener::ActiveConnection
502 Chttp2ServerListener::ActiveConnection::ActiveConnection(
503 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
504 grpc_channel_args* args)
505 : handshaking_state_(MakeOrphanable<HandshakingState>(
506 Ref(), accepting_pollset, acceptor, args)) {
507 GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
508 grpc_schedule_on_exec_ctx);
511 Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
512 if (transport_ != nullptr) {
513 GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
517 void Chttp2ServerListener::ActiveConnection::Orphan() {
518 OrphanablePtr<HandshakingState> handshaking_state;
520 MutexLock lock(&mu_);
522 // Reset handshaking_state_ since we have been orphaned by the listener
523 // signaling that the listener has stopped serving.
524 handshaking_state = std::move(handshaking_state_);
529 void Chttp2ServerListener::ActiveConnection::SendGoAway() {
530 grpc_chttp2_transport* transport = nullptr;
532 MutexLock lock(&mu_);
533 transport = transport_;
535 if (transport != nullptr) {
536 grpc_transport_op* op = grpc_make_transport_op(nullptr);
537 op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
538 "Server is stopping to serve requests.");
539 grpc_transport_perform_op(&transport->base, op);
543 void Chttp2ServerListener::ActiveConnection::Start(
544 RefCountedPtr<Chttp2ServerListener> listener, grpc_endpoint* endpoint,
545 grpc_channel_args* args) {
546 RefCountedPtr<HandshakingState> handshaking_state_ref;
547 listener_ = std::move(listener);
549 MutexLock lock(&mu_);
550 if (shutdown_) return;
551 // Hold a ref to HandshakingState to allow starting the handshake outside
552 // the critical region.
553 handshaking_state_ref = handshaking_state_->Ref();
555 handshaking_state_ref->Start(endpoint, args);
558 void Chttp2ServerListener::ActiveConnection::OnClose(
559 void* arg, grpc_error_handle /* error */) {
560 ActiveConnection* self = static_cast<ActiveConnection*>(arg);
561 OrphanablePtr<ActiveConnection> connection;
563 MutexLock listener_lock(&self->listener_->mu_);
564 MutexLock connection_lock(&self->mu_);
565 // The node was already deleted from the connections_ list if the connection
567 if (!self->shutdown_) {
568 auto it = self->listener_->connections_.find(self);
569 if (it != self->listener_->connections_.end()) {
570 connection = std::move(it->second);
571 self->listener_->connections_.erase(it);
579 // Chttp2ServerListener
582 grpc_error_handle Chttp2ServerListener::Create(
583 Server* server, grpc_resolved_address* addr, grpc_channel_args* args,
584 Chttp2ServerArgsModifier args_modifier, int* port_num) {
585 Chttp2ServerListener* listener = nullptr;
586 // The bulk of this method is inside of a lambda to make cleanup
587 // easier without using goto.
588 grpc_error_handle error = [&]() {
589 // Create Chttp2ServerListener.
590 listener = new Chttp2ServerListener(server, args, args_modifier);
591 error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
592 args, &listener->tcp_server_);
593 if (error != GRPC_ERROR_NONE) return error;
594 if (server->config_fetcher() != nullptr) {
595 listener->resolved_address_ = *addr;
596 // TODO(yashykt): Consider binding so as to be able to return the port
599 error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
600 if (error != GRPC_ERROR_NONE) return error;
602 // Create channelz node.
603 if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
604 GRPC_ENABLE_CHANNELZ_DEFAULT)) {
605 std::string string_address = grpc_sockaddr_to_uri(addr);
606 listener->channelz_listen_socket_ =
607 MakeRefCounted<channelz::ListenSocketNode>(
608 string_address.c_str(),
609 absl::StrFormat("chttp2 listener %s", string_address.c_str()));
611 // Register with the server only upon success
612 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
613 return GRPC_ERROR_NONE;
615 if (error != GRPC_ERROR_NONE) {
616 if (listener != nullptr) {
617 if (listener->tcp_server_ != nullptr) {
618 // listener is deleted when tcp_server_ is shutdown.
619 grpc_tcp_server_unref(listener->tcp_server_);
624 grpc_channel_args_destroy(args);
630 grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
631 Server* server, const char* name, grpc_channel_args* args,
632 Chttp2ServerArgsModifier args_modifier) {
633 Chttp2ServerListener* listener =
634 new Chttp2ServerListener(server, args, args_modifier);
635 grpc_error_handle error = grpc_tcp_server_create(
636 &listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_);
637 if (error != GRPC_ERROR_NONE) {
641 // TODO(yangg) channelz
642 TcpServerFdHandler** arg_val =
643 grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
644 *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
645 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
646 return GRPC_ERROR_NONE;
649 Chttp2ServerListener::Chttp2ServerListener(
650 Server* server, grpc_channel_args* args,
651 Chttp2ServerArgsModifier args_modifier)
652 : server_(server), args_modifier_(args_modifier), args_(args) {
653 GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
654 this, grpc_schedule_on_exec_ctx);
657 Chttp2ServerListener::~Chttp2ServerListener() {
658 // Flush queued work before destroying handshaker factory, since that
659 // may do a synchronous unref.
660 ExecCtx::Get()->Flush();
661 if (on_destroy_done_ != nullptr) {
662 ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE);
663 ExecCtx::Get()->Flush();
665 grpc_channel_args_destroy(args_);
668 /* Server callback: start listening on our ports */
669 void Chttp2ServerListener::Start(
670 Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
671 if (server_->config_fetcher() != nullptr) {
672 grpc_channel_args* args = nullptr;
673 auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref());
674 config_fetcher_watcher_ = watcher.get();
676 MutexLock lock(&channel_args_mu_);
677 args = grpc_channel_args_copy(args_);
679 server_->config_fetcher()->StartWatch(
680 grpc_sockaddr_to_string(&resolved_address_, false), args,
684 MutexLock lock(&mu_);
692 void Chttp2ServerListener::StartListening() {
693 grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this);
696 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
697 MutexLock lock(&mu_);
698 on_destroy_done_ = on_destroy_done;
701 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
702 grpc_pollset* accepting_pollset,
703 grpc_tcp_server_acceptor* acceptor) {
704 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
705 grpc_channel_args* args = nullptr;
706 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
709 MutexLock lock(&self->channel_args_mu_);
710 args = grpc_channel_args_copy(self->args_);
711 connection_manager = self->connection_manager_;
713 auto endpoint_cleanup = [&](grpc_error_handle error) {
714 grpc_endpoint_shutdown(tcp, error);
715 grpc_endpoint_destroy(tcp);
718 if (self->server_->config_fetcher() != nullptr) {
719 if (connection_manager == nullptr) {
720 grpc_error_handle error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
721 "No ConnectionManager configured. Closing connection.");
722 endpoint_cleanup(error);
723 grpc_channel_args_destroy(args);
726 // TODO(yashykt): Maybe combine the following two arg modifiers into a
728 absl::StatusOr<grpc_channel_args*> args_result =
729 connection_manager->UpdateChannelArgsForConnection(args, tcp);
730 if (!args_result.ok()) {
731 gpr_log(GPR_DEBUG, "Closing connection: %s",
732 args_result.status().ToString().c_str());
733 endpoint_cleanup(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
734 args_result.status().ToString().c_str()));
737 grpc_error_handle error = GRPC_ERROR_NONE;
738 args = self->args_modifier_(*args_result, &error);
739 if (error != GRPC_ERROR_NONE) {
740 gpr_log(GPR_DEBUG, "Closing connection: %s",
741 grpc_error_std_string(error).c_str());
742 endpoint_cleanup(error);
743 grpc_channel_args_destroy(args);
748 MakeOrphanable<ActiveConnection>(accepting_pollset, acceptor, args);
749 // We no longer own acceptor
751 // Hold a ref to connection to allow starting handshake outside the
753 RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
754 RefCountedPtr<Chttp2ServerListener> listener_ref;
756 MutexLock lock(&self->mu_);
757 // Shutdown the the connection if listener's stopped serving.
758 if (!self->shutdown_ && self->is_serving_) {
759 grpc_resource_user* resource_user =
760 self->server_->default_resource_user();
761 if (resource_user != nullptr &&
762 !grpc_resource_user_safe_alloc(resource_user,
763 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
766 "Memory quota exhausted, rejecting connection, no handshaking.");
768 // This ref needs to be taken in the critical region after having made
769 // sure that the listener has not been Orphaned, so as to avoid
770 // heap-use-after-free issues where `Ref()` is invoked when the ref of
771 // tcp_server_ has already reached 0. (Ref() implementation of
772 // Chttp2ServerListener is grpc_tcp_server_ref().)
773 listener_ref = self->Ref();
774 self->connections_.emplace(connection.get(), std::move(connection));
778 if (connection != nullptr) {
779 endpoint_cleanup(GRPC_ERROR_NONE);
781 connection_ref->Start(std::move(listener_ref), tcp, args);
783 grpc_channel_args_destroy(args);
786 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
787 grpc_error_handle error) {
788 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
789 self->channelz_listen_socket_.reset();
790 GRPC_ERROR_UNREF(error);
794 /* Server callback: destroy the tcp listener (so we don't generate further
796 void Chttp2ServerListener::Orphan() {
797 // Cancel the watch before shutting down so as to avoid holding a ref to the
798 // listener in the watcher.
799 if (config_fetcher_watcher_ != nullptr) {
800 server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
802 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
803 grpc_tcp_server* tcp_server;
805 MutexLock lock(&mu_);
808 // Orphan the connections so that they can start cleaning up.
809 connections = std::move(connections_);
810 // If the listener is currently set to be serving but has not been started
811 // yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
812 // operation to finish to avoid causing races.
813 while (is_serving_ && !started_) {
814 started_cv_.Wait(&mu_);
816 tcp_server = tcp_server_;
818 grpc_tcp_server_shutdown_listeners(tcp_server);
819 grpc_tcp_server_unref(tcp_server);
825 // Chttp2ServerAddPort()
828 grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
829 grpc_channel_args* args,
830 Chttp2ServerArgsModifier args_modifier,
832 if (strncmp(addr, "external:", 9) == 0) {
833 return grpc_core::Chttp2ServerListener::CreateWithAcceptor(
834 server, addr, args, args_modifier);
837 grpc_resolved_addresses* resolved = nullptr;
838 std::vector<grpc_error_handle> error_list;
839 // Using lambda to avoid use of goto.
840 grpc_error_handle error = [&]() {
841 if (absl::StartsWith(addr, kUnixUriPrefix)) {
842 error = grpc_resolve_unix_domain_address(
843 addr + sizeof(kUnixUriPrefix) - 1, &resolved);
844 } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
845 error = grpc_resolve_unix_abstract_domain_address(
846 addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
848 error = grpc_blocking_resolve_address(addr, "https", &resolved);
850 if (error != GRPC_ERROR_NONE) return error;
851 // Create a listener for each resolved address.
852 for (size_t i = 0; i < resolved->naddrs; i++) {
853 // If address has a wildcard port (0), use the same port as a previous
855 if (*port_num != -1 && grpc_sockaddr_get_port(&resolved->addrs[i]) == 0) {
856 grpc_sockaddr_set_port(&resolved->addrs[i], *port_num);
859 error = grpc_core::Chttp2ServerListener::Create(
860 server, &resolved->addrs[i], grpc_channel_args_copy(args),
861 args_modifier, &port_temp);
862 if (error != GRPC_ERROR_NONE) {
863 error_list.push_back(error);
865 if (*port_num == -1) {
866 *port_num = port_temp;
868 GPR_ASSERT(*port_num == port_temp);
872 if (error_list.size() == resolved->naddrs) {
874 absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
876 return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
877 msg.c_str(), error_list.data(), error_list.size());
878 } else if (!error_list.empty()) {
879 std::string msg = absl::StrFormat(
880 "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
882 resolved->naddrs - error_list.size(), resolved->naddrs);
883 error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
884 msg.c_str(), error_list.data(), error_list.size());
885 gpr_log(GPR_INFO, "WARNING: %s", grpc_error_std_string(error).c_str());
886 GRPC_ERROR_UNREF(error);
887 // we managed to bind some addresses: continue without error
889 return GRPC_ERROR_NONE;
891 for (grpc_error_handle error : error_list) {
892 GRPC_ERROR_UNREF(error);
894 grpc_channel_args_destroy(args);
895 if (resolved != nullptr) {
896 grpc_resolved_addresses_destroy(resolved);
898 if (error != GRPC_ERROR_NONE) *port_num = 0;
902 } // namespace grpc_core