Imported Upstream version 1.31.0
[platform/upstream/grpc.git] / src / core / ext / transport / chttp2 / server / chttp2_server.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <string.h>
26 #include <vector>
27
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/str_format.h"
30
31 #include <grpc/grpc.h>
32 #include <grpc/impl/codegen/grpc_types.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35 #include <grpc/support/sync.h>
36
37 #include "src/core/ext/filters/http/server/http_server_filter.h"
38 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
39 #include "src/core/ext/transport/chttp2/transport/internal.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/channel/handshaker.h"
42 #include "src/core/lib/channel/handshaker_registry.h"
43 #include "src/core/lib/gprpp/ref_counted.h"
44 #include "src/core/lib/gprpp/ref_counted_ptr.h"
45 #include "src/core/lib/iomgr/endpoint.h"
46 #include "src/core/lib/iomgr/resolve_address.h"
47 #include "src/core/lib/iomgr/resource_quota.h"
48 #include "src/core/lib/iomgr/tcp_server.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 #include "src/core/lib/surface/api_trace.h"
51 #include "src/core/lib/surface/server.h"
52
53 namespace grpc_core {
54 namespace {
55
56 class Chttp2ServerListener : public ServerListenerInterface {
57  public:
58   static grpc_error* Create(grpc_server* server, const char* addr,
59                             grpc_channel_args* args, int* port_num);
60
61   static grpc_error* CreateWithAcceptor(grpc_server* server, const char* name,
62                                         grpc_channel_args* args);
63
64   // Do not instantiate directly.  Use one of the factory methods above.
65   Chttp2ServerListener(grpc_server* server, grpc_channel_args* args);
66   ~Chttp2ServerListener();
67
68   void Start(grpc_server* server,
69              const std::vector<grpc_pollset*>* pollsets) override;
70
71   channelz::ListenSocketNode* channelz_listen_socket_node() const override {
72     return channelz_listen_socket_.get();
73   }
74
75   void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
76
77   void Orphan() override;
78
79  private:
80   class ConnectionState : public RefCounted<ConnectionState> {
81    public:
82     ConnectionState(Chttp2ServerListener* listener,
83                     grpc_pollset* accepting_pollset,
84                     grpc_tcp_server_acceptor* acceptor,
85                     RefCountedPtr<HandshakeManager> handshake_mgr,
86                     grpc_channel_args* args, grpc_endpoint* endpoint);
87
88     ~ConnectionState();
89
90    private:
91     static void OnTimeout(void* arg, grpc_error* error);
92     static void OnReceiveSettings(void* arg, grpc_error* error);
93     static void OnHandshakeDone(void* arg, grpc_error* error);
94
95     Chttp2ServerListener* const listener_;
96     grpc_pollset* const accepting_pollset_;
97     grpc_tcp_server_acceptor* const acceptor_;
98     RefCountedPtr<HandshakeManager> handshake_mgr_;
99     // State for enforcing handshake timeout on receiving HTTP/2 settings.
100     grpc_chttp2_transport* transport_ = nullptr;
101     grpc_millis deadline_;
102     grpc_timer timer_;
103     grpc_closure on_timeout_;
104     grpc_closure on_receive_settings_;
105     grpc_pollset_set* const interested_parties_;
106   };
107
108   static void OnAccept(void* arg, grpc_endpoint* tcp,
109                        grpc_pollset* accepting_pollset,
110                        grpc_tcp_server_acceptor* acceptor);
111
112   RefCountedPtr<HandshakeManager> CreateHandshakeManager();
113
114   static void TcpServerShutdownComplete(void* arg, grpc_error* error);
115
116   static void DestroyListener(grpc_server* /*server*/, void* arg,
117                               grpc_closure* destroy_done);
118
119   grpc_server* const server_;
120   grpc_channel_args* const args_;
121   grpc_tcp_server* tcp_server_;
122   Mutex mu_;
123   bool shutdown_ = true;
124   grpc_closure tcp_server_shutdown_complete_;
125   grpc_closure* on_destroy_done_ = nullptr;
126   HandshakeManager* pending_handshake_mgrs_ = nullptr;
127   RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
128 };
129
130 //
131 // Chttp2ServerListener::ConnectionState
132 //
133
134 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
135   int timeout_ms =
136       grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
137                                      {120 * GPR_MS_PER_SEC, 1, INT_MAX});
138   return ExecCtx::Get()->Now() + timeout_ms;
139 }
140
141 Chttp2ServerListener::ConnectionState::ConnectionState(
142     Chttp2ServerListener* listener, grpc_pollset* accepting_pollset,
143     grpc_tcp_server_acceptor* acceptor,
144     RefCountedPtr<HandshakeManager> handshake_mgr, grpc_channel_args* args,
145     grpc_endpoint* endpoint)
146     : listener_(listener),
147       accepting_pollset_(accepting_pollset),
148       acceptor_(acceptor),
149       handshake_mgr_(std::move(handshake_mgr)),
150       deadline_(GetConnectionDeadline(args)),
151       interested_parties_(grpc_pollset_set_create()) {
152   grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
153   HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
154                                      interested_parties_, handshake_mgr_.get());
155   handshake_mgr_->DoHandshake(endpoint, args, deadline_, acceptor_,
156                               OnHandshakeDone, this);
157 }
158
159 Chttp2ServerListener::ConnectionState::~ConnectionState() {
160   if (transport_ != nullptr) {
161     GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "receive settings timeout");
162   }
163   grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
164   grpc_pollset_set_destroy(interested_parties_);
165 }
166
167 void Chttp2ServerListener::ConnectionState::OnTimeout(void* arg,
168                                                       grpc_error* error) {
169   ConnectionState* self = static_cast<ConnectionState*>(arg);
170   // Note that we may be called with GRPC_ERROR_NONE when the timer fires
171   // or with an error indicating that the timer system is being shut down.
172   if (error != GRPC_ERROR_CANCELLED) {
173     grpc_transport_op* op = grpc_make_transport_op(nullptr);
174     op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
175         "Did not receive HTTP/2 settings before handshake timeout");
176     grpc_transport_perform_op(&self->transport_->base, op);
177   }
178   self->Unref();
179 }
180
181 void Chttp2ServerListener::ConnectionState::OnReceiveSettings(
182     void* arg, grpc_error* error) {
183   ConnectionState* self = static_cast<ConnectionState*>(arg);
184   if (error == GRPC_ERROR_NONE) {
185     grpc_timer_cancel(&self->timer_);
186   }
187   self->Unref();
188 }
189
190 void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
191                                                             grpc_error* error) {
192   auto* args = static_cast<HandshakerArgs*>(arg);
193   ConnectionState* self = static_cast<ConnectionState*>(args->user_data);
194   {
195     MutexLock lock(&self->listener_->mu_);
196     grpc_resource_user* resource_user =
197         grpc_server_get_default_resource_user(self->listener_->server_);
198     if (error != GRPC_ERROR_NONE || self->listener_->shutdown_) {
199       const char* error_str = grpc_error_string(error);
200       gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
201       grpc_resource_user* resource_user =
202           grpc_server_get_default_resource_user(self->listener_->server_);
203       if (resource_user != nullptr) {
204         grpc_resource_user_free(resource_user,
205                                 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
206       }
207       if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
208         // We were shut down after handshaking completed successfully, so
209         // destroy the endpoint here.
210         // TODO(ctiller): It is currently necessary to shutdown endpoints
211         // before destroying them, even if we know that there are no
212         // pending read/write callbacks.  This should be fixed, at which
213         // point this can be removed.
214         grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
215         grpc_endpoint_destroy(args->endpoint);
216         grpc_channel_args_destroy(args->args);
217         grpc_slice_buffer_destroy_internal(args->read_buffer);
218         gpr_free(args->read_buffer);
219       }
220     } else {
221       // If the handshaking succeeded but there is no endpoint, then the
222       // handshaker may have handed off the connection to some external
223       // code, so we can just clean up here without creating a transport.
224       if (args->endpoint != nullptr) {
225         grpc_transport* transport = grpc_create_chttp2_transport(
226             args->args, args->endpoint, false, resource_user);
227         grpc_server_setup_transport(
228             self->listener_->server_, transport, self->accepting_pollset_,
229             args->args, grpc_chttp2_transport_get_socket_node(transport),
230             resource_user);
231         // Use notify_on_receive_settings callback to enforce the
232         // handshake deadline.
233         // Note: The reinterpret_cast<>s here are safe, because
234         // grpc_chttp2_transport is a C-style extension of
235         // grpc_transport, so this is morally equivalent of a
236         // static_cast<> to a derived class.
237         // TODO(roth): Change to static_cast<> when we C++-ify the
238         // transport API.
239         self->transport_ = reinterpret_cast<grpc_chttp2_transport*>(transport);
240         self->Ref().release();  // Held by OnReceiveSettings().
241         GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
242                           grpc_schedule_on_exec_ctx);
243         grpc_chttp2_transport_start_reading(transport, args->read_buffer,
244                                             &self->on_receive_settings_);
245         grpc_channel_args_destroy(args->args);
246         self->Ref().release();  // Held by OnTimeout().
247         GRPC_CHTTP2_REF_TRANSPORT(
248             reinterpret_cast<grpc_chttp2_transport*>(transport),
249             "receive settings timeout");
250         GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
251                           grpc_schedule_on_exec_ctx);
252         grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
253       } else {
254         if (resource_user != nullptr) {
255           grpc_resource_user_free(resource_user,
256                                   GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
257         }
258       }
259     }
260     self->handshake_mgr_->RemoveFromPendingMgrList(
261         &self->listener_->pending_handshake_mgrs_);
262   }
263   self->handshake_mgr_.reset();
264   gpr_free(self->acceptor_);
265   grpc_tcp_server_unref(self->listener_->tcp_server_);
266   self->Unref();
267 }
268
269 //
270 // Chttp2ServerListener
271 //
272
273 grpc_error* Chttp2ServerListener::Create(grpc_server* server, const char* addr,
274                                          grpc_channel_args* args,
275                                          int* port_num) {
276   std::vector<grpc_error*> error_list;
277   grpc_resolved_addresses* resolved = nullptr;
278   Chttp2ServerListener* listener = nullptr;
279   // The bulk of this method is inside of a lambda to make cleanup
280   // easier without using goto.
281   grpc_error* error = [&]() {
282     *port_num = -1;
283     /* resolve address */
284     grpc_error* error = grpc_blocking_resolve_address(addr, "https", &resolved);
285     if (error != GRPC_ERROR_NONE) return error;
286     // Create Chttp2ServerListener.
287     listener = new Chttp2ServerListener(server, args);
288     error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
289                                    args, &listener->tcp_server_);
290     if (error != GRPC_ERROR_NONE) return error;
291     for (size_t i = 0; i < resolved->naddrs; i++) {
292       int port_temp;
293       error = grpc_tcp_server_add_port(listener->tcp_server_,
294                                        &resolved->addrs[i], &port_temp);
295       if (error != GRPC_ERROR_NONE) {
296         error_list.push_back(error);
297       } else {
298         if (*port_num == -1) {
299           *port_num = port_temp;
300         } else {
301           GPR_ASSERT(*port_num == port_temp);
302         }
303       }
304     }
305     if (error_list.size() == resolved->naddrs) {
306       std::string msg =
307           absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
308                           resolved->naddrs);
309       return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
310           msg.c_str(), error_list.data(), error_list.size());
311     } else if (!error_list.empty()) {
312       std::string msg = absl::StrFormat(
313           "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
314           " resolved",
315           resolved->naddrs - error_list.size(), resolved->naddrs);
316       error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
317           msg.c_str(), error_list.data(), error_list.size());
318       gpr_log(GPR_INFO, "WARNING: %s", grpc_error_string(error));
319       GRPC_ERROR_UNREF(error);
320       /* we managed to bind some addresses: continue */
321     }
322     // Create channelz node.
323     if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
324                                     GRPC_ENABLE_CHANNELZ_DEFAULT)) {
325       listener->channelz_listen_socket_ =
326           MakeRefCounted<channelz::ListenSocketNode>(
327               addr, absl::StrFormat("chttp2 listener %s", addr));
328     }
329     /* Register with the server only upon success */
330     grpc_server_add_listener(server,
331                              OrphanablePtr<ServerListenerInterface>(listener));
332     return GRPC_ERROR_NONE;
333   }();
334   if (resolved != nullptr) {
335     grpc_resolved_addresses_destroy(resolved);
336   }
337   if (error != GRPC_ERROR_NONE) {
338     if (listener != nullptr) {
339       if (listener->tcp_server_ != nullptr) {
340         grpc_tcp_server_unref(listener->tcp_server_);
341       } else {
342         delete listener;
343       }
344     } else {
345       grpc_channel_args_destroy(args);
346     }
347     *port_num = 0;
348   }
349   for (grpc_error* error : error_list) {
350     GRPC_ERROR_UNREF(error);
351   }
352   return error;
353 }
354
355 grpc_error* Chttp2ServerListener::CreateWithAcceptor(grpc_server* server,
356                                                      const char* name,
357                                                      grpc_channel_args* args) {
358   Chttp2ServerListener* listener = new Chttp2ServerListener(server, args);
359   grpc_error* error = grpc_tcp_server_create(
360       &listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_);
361   if (error != GRPC_ERROR_NONE) {
362     delete listener;
363     return error;
364   }
365   // TODO(yangg) channelz
366   TcpServerFdHandler** arg_val =
367       grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
368   *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
369   grpc_server_add_listener(server,
370                            OrphanablePtr<ServerListenerInterface>(listener));
371   return GRPC_ERROR_NONE;
372 }
373
374 Chttp2ServerListener::Chttp2ServerListener(grpc_server* server,
375                                            grpc_channel_args* args)
376     : server_(server), args_(args) {
377   GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
378                     this, grpc_schedule_on_exec_ctx);
379 }
380
381 Chttp2ServerListener::~Chttp2ServerListener() {
382   grpc_channel_args_destroy(args_);
383 }
384
385 /* Server callback: start listening on our ports */
386 void Chttp2ServerListener::Start(grpc_server* /*server*/,
387                                  const std::vector<grpc_pollset*>* pollsets) {
388   {
389     MutexLock lock(&mu_);
390     shutdown_ = false;
391   }
392   grpc_tcp_server_start(tcp_server_, pollsets, OnAccept, this);
393 }
394
395 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
396   MutexLock lock(&mu_);
397   on_destroy_done_ = on_destroy_done;
398 }
399
400 RefCountedPtr<HandshakeManager> Chttp2ServerListener::CreateHandshakeManager() {
401   MutexLock lock(&mu_);
402   if (shutdown_) return nullptr;
403   grpc_resource_user* resource_user =
404       grpc_server_get_default_resource_user(server_);
405   if (resource_user != nullptr &&
406       !grpc_resource_user_safe_alloc(resource_user,
407                                      GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
408     gpr_log(GPR_ERROR,
409             "Memory quota exhausted, rejecting connection, no handshaking.");
410     return nullptr;
411   }
412   auto handshake_mgr = MakeRefCounted<HandshakeManager>();
413   handshake_mgr->AddToPendingMgrList(&pending_handshake_mgrs_);
414   grpc_tcp_server_ref(tcp_server_);  // Ref held by ConnectionState.
415   return handshake_mgr;
416 }
417
418 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
419                                     grpc_pollset* accepting_pollset,
420                                     grpc_tcp_server_acceptor* acceptor) {
421   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
422   RefCountedPtr<HandshakeManager> handshake_mgr =
423       self->CreateHandshakeManager();
424   if (handshake_mgr == nullptr) {
425     grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE);
426     grpc_endpoint_destroy(tcp);
427     gpr_free(acceptor);
428     return;
429   }
430   // Deletes itself when done.
431   new ConnectionState(self, accepting_pollset, acceptor,
432                       std::move(handshake_mgr), self->args_, tcp);
433 }
434
435 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
436                                                      grpc_error* error) {
437   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
438   /* ensure all threads have unlocked */
439   grpc_closure* destroy_done = nullptr;
440   {
441     MutexLock lock(&self->mu_);
442     destroy_done = self->on_destroy_done_;
443     GPR_ASSERT(self->shutdown_);
444     if (self->pending_handshake_mgrs_ != nullptr) {
445       self->pending_handshake_mgrs_->ShutdownAllPending(GRPC_ERROR_REF(error));
446     }
447     self->channelz_listen_socket_.reset();
448   }
449   // Flush queued work before destroying handshaker factory, since that
450   // may do a synchronous unref.
451   ExecCtx::Get()->Flush();
452   if (destroy_done != nullptr) {
453     ExecCtx::Run(DEBUG_LOCATION, destroy_done, GRPC_ERROR_REF(error));
454     ExecCtx::Get()->Flush();
455   }
456   delete self;
457 }
458
459 /* Server callback: destroy the tcp listener (so we don't generate further
460    callbacks) */
461 void Chttp2ServerListener::Orphan() {
462   grpc_tcp_server* tcp_server;
463   {
464     MutexLock lock(&mu_);
465     shutdown_ = true;
466     tcp_server = tcp_server_;
467   }
468   grpc_tcp_server_shutdown_listeners(tcp_server);
469   grpc_tcp_server_unref(tcp_server);
470 }
471
472 }  // namespace
473
474 //
475 // Chttp2ServerAddPort()
476 //
477
478 grpc_error* Chttp2ServerAddPort(grpc_server* server, const char* addr,
479                                 grpc_channel_args* args, int* port_num) {
480   if (strncmp(addr, "external:", 9) == 0) {
481     return grpc_core::Chttp2ServerListener::CreateWithAcceptor(server, addr,
482                                                                args);
483   }
484   return grpc_core::Chttp2ServerListener::Create(server, addr, args, port_num);
485 }
486
487 }  // namespace grpc_core