3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/str_format.h"
31 #include <grpc/grpc.h>
32 #include <grpc/impl/codegen/grpc_types.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35 #include <grpc/support/sync.h>
37 #include "src/core/ext/filters/http/server/http_server_filter.h"
38 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
39 #include "src/core/ext/transport/chttp2/transport/internal.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/channel/handshaker.h"
42 #include "src/core/lib/channel/handshaker_registry.h"
43 #include "src/core/lib/gprpp/ref_counted.h"
44 #include "src/core/lib/gprpp/ref_counted_ptr.h"
45 #include "src/core/lib/iomgr/endpoint.h"
46 #include "src/core/lib/iomgr/resolve_address.h"
47 #include "src/core/lib/iomgr/resource_quota.h"
48 #include "src/core/lib/iomgr/tcp_server.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 #include "src/core/lib/surface/api_trace.h"
51 #include "src/core/lib/surface/server.h"
56 class Chttp2ServerListener : public ServerListenerInterface {
58 static grpc_error* Create(grpc_server* server, const char* addr,
59 grpc_channel_args* args, int* port_num);
61 static grpc_error* CreateWithAcceptor(grpc_server* server, const char* name,
62 grpc_channel_args* args);
64 // Do not instantiate directly. Use one of the factory methods above.
65 Chttp2ServerListener(grpc_server* server, grpc_channel_args* args);
66 ~Chttp2ServerListener();
68 void Start(grpc_server* server,
69 const std::vector<grpc_pollset*>* pollsets) override;
71 channelz::ListenSocketNode* channelz_listen_socket_node() const override {
72 return channelz_listen_socket_.get();
75 void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
77 void Orphan() override;
80 class ConnectionState : public RefCounted<ConnectionState> {
82 ConnectionState(Chttp2ServerListener* listener,
83 grpc_pollset* accepting_pollset,
84 grpc_tcp_server_acceptor* acceptor,
85 RefCountedPtr<HandshakeManager> handshake_mgr,
86 grpc_channel_args* args, grpc_endpoint* endpoint);
91 static void OnTimeout(void* arg, grpc_error* error);
92 static void OnReceiveSettings(void* arg, grpc_error* error);
93 static void OnHandshakeDone(void* arg, grpc_error* error);
95 Chttp2ServerListener* const listener_;
96 grpc_pollset* const accepting_pollset_;
97 grpc_tcp_server_acceptor* const acceptor_;
98 RefCountedPtr<HandshakeManager> handshake_mgr_;
99 // State for enforcing handshake timeout on receiving HTTP/2 settings.
100 grpc_chttp2_transport* transport_ = nullptr;
101 grpc_millis deadline_;
103 grpc_closure on_timeout_;
104 grpc_closure on_receive_settings_;
105 grpc_pollset_set* const interested_parties_;
108 static void OnAccept(void* arg, grpc_endpoint* tcp,
109 grpc_pollset* accepting_pollset,
110 grpc_tcp_server_acceptor* acceptor);
112 RefCountedPtr<HandshakeManager> CreateHandshakeManager();
114 static void TcpServerShutdownComplete(void* arg, grpc_error* error);
116 static void DestroyListener(grpc_server* /*server*/, void* arg,
117 grpc_closure* destroy_done);
119 grpc_server* const server_;
120 grpc_channel_args* const args_;
121 grpc_tcp_server* tcp_server_;
123 bool shutdown_ = true;
124 grpc_closure tcp_server_shutdown_complete_;
125 grpc_closure* on_destroy_done_ = nullptr;
126 HandshakeManager* pending_handshake_mgrs_ = nullptr;
127 RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
131 // Chttp2ServerListener::ConnectionState
134 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
136 grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
137 {120 * GPR_MS_PER_SEC, 1, INT_MAX});
138 return ExecCtx::Get()->Now() + timeout_ms;
141 Chttp2ServerListener::ConnectionState::ConnectionState(
142 Chttp2ServerListener* listener, grpc_pollset* accepting_pollset,
143 grpc_tcp_server_acceptor* acceptor,
144 RefCountedPtr<HandshakeManager> handshake_mgr, grpc_channel_args* args,
145 grpc_endpoint* endpoint)
146 : listener_(listener),
147 accepting_pollset_(accepting_pollset),
149 handshake_mgr_(std::move(handshake_mgr)),
150 deadline_(GetConnectionDeadline(args)),
151 interested_parties_(grpc_pollset_set_create()) {
152 grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
153 HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
154 interested_parties_, handshake_mgr_.get());
155 handshake_mgr_->DoHandshake(endpoint, args, deadline_, acceptor_,
156 OnHandshakeDone, this);
159 Chttp2ServerListener::ConnectionState::~ConnectionState() {
160 if (transport_ != nullptr) {
161 GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "receive settings timeout");
163 grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
164 grpc_pollset_set_destroy(interested_parties_);
167 void Chttp2ServerListener::ConnectionState::OnTimeout(void* arg,
169 ConnectionState* self = static_cast<ConnectionState*>(arg);
170 // Note that we may be called with GRPC_ERROR_NONE when the timer fires
171 // or with an error indicating that the timer system is being shut down.
172 if (error != GRPC_ERROR_CANCELLED) {
173 grpc_transport_op* op = grpc_make_transport_op(nullptr);
174 op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
175 "Did not receive HTTP/2 settings before handshake timeout");
176 grpc_transport_perform_op(&self->transport_->base, op);
181 void Chttp2ServerListener::ConnectionState::OnReceiveSettings(
182 void* arg, grpc_error* error) {
183 ConnectionState* self = static_cast<ConnectionState*>(arg);
184 if (error == GRPC_ERROR_NONE) {
185 grpc_timer_cancel(&self->timer_);
190 void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
192 auto* args = static_cast<HandshakerArgs*>(arg);
193 ConnectionState* self = static_cast<ConnectionState*>(args->user_data);
195 MutexLock lock(&self->listener_->mu_);
196 grpc_resource_user* resource_user =
197 grpc_server_get_default_resource_user(self->listener_->server_);
198 if (error != GRPC_ERROR_NONE || self->listener_->shutdown_) {
199 const char* error_str = grpc_error_string(error);
200 gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
201 grpc_resource_user* resource_user =
202 grpc_server_get_default_resource_user(self->listener_->server_);
203 if (resource_user != nullptr) {
204 grpc_resource_user_free(resource_user,
205 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
207 if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
208 // We were shut down after handshaking completed successfully, so
209 // destroy the endpoint here.
210 // TODO(ctiller): It is currently necessary to shutdown endpoints
211 // before destroying them, even if we know that there are no
212 // pending read/write callbacks. This should be fixed, at which
213 // point this can be removed.
214 grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
215 grpc_endpoint_destroy(args->endpoint);
216 grpc_channel_args_destroy(args->args);
217 grpc_slice_buffer_destroy_internal(args->read_buffer);
218 gpr_free(args->read_buffer);
221 // If the handshaking succeeded but there is no endpoint, then the
222 // handshaker may have handed off the connection to some external
223 // code, so we can just clean up here without creating a transport.
224 if (args->endpoint != nullptr) {
225 grpc_transport* transport = grpc_create_chttp2_transport(
226 args->args, args->endpoint, false, resource_user);
227 grpc_server_setup_transport(
228 self->listener_->server_, transport, self->accepting_pollset_,
229 args->args, grpc_chttp2_transport_get_socket_node(transport),
231 // Use notify_on_receive_settings callback to enforce the
232 // handshake deadline.
233 // Note: The reinterpret_cast<>s here are safe, because
234 // grpc_chttp2_transport is a C-style extension of
235 // grpc_transport, so this is morally equivalent of a
236 // static_cast<> to a derived class.
237 // TODO(roth): Change to static_cast<> when we C++-ify the
239 self->transport_ = reinterpret_cast<grpc_chttp2_transport*>(transport);
240 self->Ref().release(); // Held by OnReceiveSettings().
241 GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
242 grpc_schedule_on_exec_ctx);
243 grpc_chttp2_transport_start_reading(transport, args->read_buffer,
244 &self->on_receive_settings_);
245 grpc_channel_args_destroy(args->args);
246 self->Ref().release(); // Held by OnTimeout().
247 GRPC_CHTTP2_REF_TRANSPORT(
248 reinterpret_cast<grpc_chttp2_transport*>(transport),
249 "receive settings timeout");
250 GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
251 grpc_schedule_on_exec_ctx);
252 grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
254 if (resource_user != nullptr) {
255 grpc_resource_user_free(resource_user,
256 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
260 self->handshake_mgr_->RemoveFromPendingMgrList(
261 &self->listener_->pending_handshake_mgrs_);
263 self->handshake_mgr_.reset();
264 gpr_free(self->acceptor_);
265 grpc_tcp_server_unref(self->listener_->tcp_server_);
270 // Chttp2ServerListener
273 grpc_error* Chttp2ServerListener::Create(grpc_server* server, const char* addr,
274 grpc_channel_args* args,
276 std::vector<grpc_error*> error_list;
277 grpc_resolved_addresses* resolved = nullptr;
278 Chttp2ServerListener* listener = nullptr;
279 // The bulk of this method is inside of a lambda to make cleanup
280 // easier without using goto.
281 grpc_error* error = [&]() {
283 /* resolve address */
284 grpc_error* error = grpc_blocking_resolve_address(addr, "https", &resolved);
285 if (error != GRPC_ERROR_NONE) return error;
286 // Create Chttp2ServerListener.
287 listener = new Chttp2ServerListener(server, args);
288 error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
289 args, &listener->tcp_server_);
290 if (error != GRPC_ERROR_NONE) return error;
291 for (size_t i = 0; i < resolved->naddrs; i++) {
293 error = grpc_tcp_server_add_port(listener->tcp_server_,
294 &resolved->addrs[i], &port_temp);
295 if (error != GRPC_ERROR_NONE) {
296 error_list.push_back(error);
298 if (*port_num == -1) {
299 *port_num = port_temp;
301 GPR_ASSERT(*port_num == port_temp);
305 if (error_list.size() == resolved->naddrs) {
307 absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
309 return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
310 msg.c_str(), error_list.data(), error_list.size());
311 } else if (!error_list.empty()) {
312 std::string msg = absl::StrFormat(
313 "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
315 resolved->naddrs - error_list.size(), resolved->naddrs);
316 error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
317 msg.c_str(), error_list.data(), error_list.size());
318 gpr_log(GPR_INFO, "WARNING: %s", grpc_error_string(error));
319 GRPC_ERROR_UNREF(error);
320 /* we managed to bind some addresses: continue */
322 // Create channelz node.
323 if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
324 GRPC_ENABLE_CHANNELZ_DEFAULT)) {
325 listener->channelz_listen_socket_ =
326 MakeRefCounted<channelz::ListenSocketNode>(
327 addr, absl::StrFormat("chttp2 listener %s", addr));
329 /* Register with the server only upon success */
330 grpc_server_add_listener(server,
331 OrphanablePtr<ServerListenerInterface>(listener));
332 return GRPC_ERROR_NONE;
334 if (resolved != nullptr) {
335 grpc_resolved_addresses_destroy(resolved);
337 if (error != GRPC_ERROR_NONE) {
338 if (listener != nullptr) {
339 if (listener->tcp_server_ != nullptr) {
340 grpc_tcp_server_unref(listener->tcp_server_);
345 grpc_channel_args_destroy(args);
349 for (grpc_error* error : error_list) {
350 GRPC_ERROR_UNREF(error);
355 grpc_error* Chttp2ServerListener::CreateWithAcceptor(grpc_server* server,
357 grpc_channel_args* args) {
358 Chttp2ServerListener* listener = new Chttp2ServerListener(server, args);
359 grpc_error* error = grpc_tcp_server_create(
360 &listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_);
361 if (error != GRPC_ERROR_NONE) {
365 // TODO(yangg) channelz
366 TcpServerFdHandler** arg_val =
367 grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
368 *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
369 grpc_server_add_listener(server,
370 OrphanablePtr<ServerListenerInterface>(listener));
371 return GRPC_ERROR_NONE;
374 Chttp2ServerListener::Chttp2ServerListener(grpc_server* server,
375 grpc_channel_args* args)
376 : server_(server), args_(args) {
377 GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
378 this, grpc_schedule_on_exec_ctx);
381 Chttp2ServerListener::~Chttp2ServerListener() {
382 grpc_channel_args_destroy(args_);
385 /* Server callback: start listening on our ports */
386 void Chttp2ServerListener::Start(grpc_server* /*server*/,
387 const std::vector<grpc_pollset*>* pollsets) {
389 MutexLock lock(&mu_);
392 grpc_tcp_server_start(tcp_server_, pollsets, OnAccept, this);
395 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
396 MutexLock lock(&mu_);
397 on_destroy_done_ = on_destroy_done;
400 RefCountedPtr<HandshakeManager> Chttp2ServerListener::CreateHandshakeManager() {
401 MutexLock lock(&mu_);
402 if (shutdown_) return nullptr;
403 grpc_resource_user* resource_user =
404 grpc_server_get_default_resource_user(server_);
405 if (resource_user != nullptr &&
406 !grpc_resource_user_safe_alloc(resource_user,
407 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
409 "Memory quota exhausted, rejecting connection, no handshaking.");
412 auto handshake_mgr = MakeRefCounted<HandshakeManager>();
413 handshake_mgr->AddToPendingMgrList(&pending_handshake_mgrs_);
414 grpc_tcp_server_ref(tcp_server_); // Ref held by ConnectionState.
415 return handshake_mgr;
418 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
419 grpc_pollset* accepting_pollset,
420 grpc_tcp_server_acceptor* acceptor) {
421 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
422 RefCountedPtr<HandshakeManager> handshake_mgr =
423 self->CreateHandshakeManager();
424 if (handshake_mgr == nullptr) {
425 grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE);
426 grpc_endpoint_destroy(tcp);
430 // Deletes itself when done.
431 new ConnectionState(self, accepting_pollset, acceptor,
432 std::move(handshake_mgr), self->args_, tcp);
435 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
437 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
438 /* ensure all threads have unlocked */
439 grpc_closure* destroy_done = nullptr;
441 MutexLock lock(&self->mu_);
442 destroy_done = self->on_destroy_done_;
443 GPR_ASSERT(self->shutdown_);
444 if (self->pending_handshake_mgrs_ != nullptr) {
445 self->pending_handshake_mgrs_->ShutdownAllPending(GRPC_ERROR_REF(error));
447 self->channelz_listen_socket_.reset();
449 // Flush queued work before destroying handshaker factory, since that
450 // may do a synchronous unref.
451 ExecCtx::Get()->Flush();
452 if (destroy_done != nullptr) {
453 ExecCtx::Run(DEBUG_LOCATION, destroy_done, GRPC_ERROR_REF(error));
454 ExecCtx::Get()->Flush();
459 /* Server callback: destroy the tcp listener (so we don't generate further
461 void Chttp2ServerListener::Orphan() {
462 grpc_tcp_server* tcp_server;
464 MutexLock lock(&mu_);
466 tcp_server = tcp_server_;
468 grpc_tcp_server_shutdown_listeners(tcp_server);
469 grpc_tcp_server_unref(tcp_server);
475 // Chttp2ServerAddPort()
478 grpc_error* Chttp2ServerAddPort(grpc_server* server, const char* addr,
479 grpc_channel_args* args, int* port_num) {
480 if (strncmp(addr, "external:", 9) == 0) {
481 return grpc_core::Chttp2ServerListener::CreateWithAcceptor(server, addr,
484 return grpc_core::Chttp2ServerListener::Create(server, addr, args, port_num);
487 } // namespace grpc_core