2 * nghttp2 - HTTP/2 C Library
4 * Copyright (c) 2012 Tatsuhiro Tsujikawa
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 #include "shrpx_client_handler.h"
29 #endif // HAVE_UNISTD_H
30 #ifdef HAVE_SYS_SOCKET_H
31 # include <sys/socket.h>
32 #endif // HAVE_SYS_SOCKET_H
35 #endif // HAVE_NETDB_H
39 #include "shrpx_upstream.h"
40 #include "shrpx_http2_upstream.h"
41 #include "shrpx_https_upstream.h"
42 #include "shrpx_config.h"
43 #include "shrpx_http_downstream_connection.h"
44 #include "shrpx_http2_downstream_connection.h"
45 #include "shrpx_tls.h"
46 #include "shrpx_worker.h"
47 #include "shrpx_downstream_connection_pool.h"
48 #include "shrpx_downstream.h"
49 #include "shrpx_http2_session.h"
50 #include "shrpx_connect_blocker.h"
51 #include "shrpx_api_downstream_connection.h"
52 #include "shrpx_health_monitor_downstream_connection.h"
53 #include "shrpx_null_downstream_connection.h"
55 # include "shrpx_http3_upstream.h"
56 #endif // ENABLE_HTTP3
57 #include "shrpx_log.h"
62 using namespace nghttp2;
67 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
68 auto conn = static_cast<Connection *>(w->data);
69 auto handler = static_cast<ClientHandler *>(conn->data);
71 if (LOG_ENABLED(INFO)) {
72 CLOG(INFO, handler) << "Time out";
80 void shutdowncb(struct ev_loop *loop, ev_timer *w, int revents) {
81 auto handler = static_cast<ClientHandler *>(w->data);
83 if (LOG_ENABLED(INFO)) {
84 CLOG(INFO, handler) << "Close connection due to TLS renegotiation";
92 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
93 auto conn = static_cast<Connection *>(w->data);
94 auto handler = static_cast<ClientHandler *>(conn->data);
96 if (handler->do_read() != 0) {
104 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
105 auto conn = static_cast<Connection *>(w->data);
106 auto handler = static_cast<ClientHandler *>(conn->data);
108 if (handler->do_write() != 0) {
115 int ClientHandler::noop() { return 0; }
117 int ClientHandler::read_clear() {
118 auto should_break = false;
121 if (rb_.rleft() && on_read() != 0) {
124 if (rb_.rleft() == 0) {
126 } else if (rb_.wleft() == 0) {
127 conn_.rlimit.stopw();
131 if (!ev_is_active(&conn_.rev) || should_break) {
135 auto nread = conn_.read_clear(rb_.last(), rb_.wleft());
138 if (rb_.rleft() == 0) {
153 int ClientHandler::write_clear() {
154 std::array<iovec, 2> iov;
157 if (on_write() != 0) {
161 auto iovcnt = upstream_->response_riovec(iov.data(), iov.size());
166 auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
175 upstream_->response_drain(nwrite);
178 conn_.wlimit.stopw();
179 ev_timer_stop(conn_.loop, &conn_.wt);
184 int ClientHandler::tls_handshake() {
185 ev_timer_again(conn_.loop, &conn_.rt);
189 auto rv = conn_.tls_handshake();
191 if (rv == SHRPX_ERR_INPROGRESS) {
199 if (LOG_ENABLED(INFO)) {
200 CLOG(INFO, this) << "SSL/TLS handshake completed";
203 if (validate_next_proto() != 0) {
207 read_ = &ClientHandler::read_tls;
208 write_ = &ClientHandler::write_tls;
213 int ClientHandler::read_tls() {
214 auto should_break = false;
221 // we should process buffered data first before we read EOF.
222 if (rb_.rleft() && on_read() != 0) {
225 if (rb_.rleft() == 0) {
227 } else if (rb_.wleft() == 0) {
228 conn_.rlimit.stopw();
232 if (!ev_is_active(&conn_.rev) || should_break) {
236 auto nread = conn_.read_tls(rb_.last(), rb_.wleft());
239 if (rb_.rleft() == 0) {
254 int ClientHandler::write_tls() {
259 if (on_write() != 0) {
263 auto iovcnt = upstream_->response_riovec(&iov, 1);
265 conn_.start_tls_write_idle();
267 conn_.wlimit.stopw();
268 ev_timer_stop(conn_.loop, &conn_.wt);
274 auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
283 upstream_->response_drain(nwrite);
285 iovcnt = upstream_->response_riovec(&iov, 1);
293 int ClientHandler::read_quic(const UpstreamAddr *faddr,
294 const Address &remote_addr,
295 const Address &local_addr, const uint8_t *data,
297 auto upstream = static_cast<Http3Upstream *>(upstream_.get());
299 return upstream->on_read(faddr, remote_addr, local_addr, data, datalen);
302 int ClientHandler::write_quic() { return upstream_->on_write(); }
303 #endif // ENABLE_HTTP3
305 int ClientHandler::upstream_noop() { return 0; }
307 int ClientHandler::upstream_read() {
309 if (upstream_->on_read() != 0) {
315 int ClientHandler::upstream_write() {
317 if (upstream_->on_write() != 0) {
321 if (get_should_close_after_write() && upstream_->response_empty()) {
328 int ClientHandler::upstream_http2_connhd_read() {
329 auto nread = std::min(left_connhd_len_, rb_.rleft());
330 if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
331 rb_.pos(), nread) != 0) {
332 // There is no downgrade path here. Just drop the connection.
333 if (LOG_ENABLED(INFO)) {
334 CLOG(INFO, this) << "invalid client connection header";
340 left_connhd_len_ -= nread;
342 conn_.rlimit.startw();
344 if (left_connhd_len_ == 0) {
345 on_read_ = &ClientHandler::upstream_read;
346 // Run on_read to process data left in buffer since they are not
348 if (on_read() != 0) {
357 int ClientHandler::upstream_http1_connhd_read() {
358 auto nread = std::min(left_connhd_len_, rb_.rleft());
359 if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
360 rb_.pos(), nread) != 0) {
361 if (LOG_ENABLED(INFO)) {
362 CLOG(INFO, this) << "This is HTTP/1.1 connection, "
363 << "but may be upgraded to HTTP/2 later.";
366 // Reset header length for later HTTP/2 upgrade
367 left_connhd_len_ = NGHTTP2_CLIENT_MAGIC_LEN;
368 on_read_ = &ClientHandler::upstream_read;
369 on_write_ = &ClientHandler::upstream_write;
371 if (on_read() != 0) {
378 left_connhd_len_ -= nread;
380 conn_.rlimit.startw();
382 if (left_connhd_len_ == 0) {
383 if (LOG_ENABLED(INFO)) {
384 CLOG(INFO, this) << "direct HTTP/2 connection";
387 direct_http2_upgrade();
388 on_read_ = &ClientHandler::upstream_read;
389 on_write_ = &ClientHandler::upstream_write;
391 // Run on_read to process data left in buffer since they are not
393 if (on_read() != 0) {
403 ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
404 const StringRef &ipaddr, const StringRef &port,
405 int family, const UpstreamAddr *faddr)
406 : // We use balloc_ for TLS session ID (64), ipaddr (IPv6) (39),
407 // port (5), forwarded-for (IPv6) (41), alpn (5), proxyproto
408 // ipaddr (15), proxyproto port (5), sni (32, estimated). we
409 // need terminal NULL byte for each. We also require 8 bytes
410 // header for each allocation. We align at 16 bytes boundary,
411 // so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 +
412 // 32 + 8 + 8 * 8 = 328.
414 rb_(worker->get_mcpool()),
415 conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(),
416 get_config()->conn.upstream.timeout.write,
417 get_config()->conn.upstream.timeout.read,
418 get_config()->conn.upstream.ratelimit.write,
419 get_config()->conn.upstream.ratelimit.read, writecb, readcb,
420 timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
421 get_config()->tls.dyn_rec.idle_timeout,
422 faddr->quic ? Proto::HTTP3 : Proto::NONE),
423 ipaddr_(make_string_ref(balloc_, ipaddr)),
424 port_(make_string_ref(balloc_, port)),
427 left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
429 should_close_after_write_(false),
430 affinity_hash_computed_(false) {
432 ++worker_->get_worker_stat()->num_connections;
434 ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.);
436 reneg_shutdown_timer_.data = this;
439 conn_.rlimit.startw();
441 ev_timer_again(conn_.loop, &conn_.rt);
443 auto config = get_config();
446 if (faddr_->accept_proxy_protocol ||
447 config->conn.upstream.accept_proxy_protocol) {
448 read_ = &ClientHandler::read_clear;
449 write_ = &ClientHandler::noop;
450 on_read_ = &ClientHandler::proxy_protocol_read;
451 on_write_ = &ClientHandler::upstream_noop;
453 setup_upstream_io_callback();
457 auto &fwdconf = config->http.forwarded;
459 if (fwdconf.params & FORWARDED_FOR) {
460 if (fwdconf.for_node_type == ForwardedNode::OBFUSCATED) {
462 auto len = SHRPX_OBFUSCATED_NODE_LENGTH + 1;
463 // 1 for terminating NUL.
464 auto buf = make_byte_ref(balloc_, len + 1);
467 p = util::random_alpha_digit(p, p + SHRPX_OBFUSCATED_NODE_LENGTH,
468 worker_->get_randgen());
471 forwarded_for_ = StringRef{buf.base, p};
473 init_forwarded_for(family, ipaddr_);
478 void ClientHandler::init_forwarded_for(int family, const StringRef &ipaddr) {
479 if (family == AF_INET6) {
481 auto len = 2 + ipaddr.size();
482 // 1 for terminating NUL.
483 auto buf = make_byte_ref(balloc_, len + 1);
486 p = std::copy(std::begin(ipaddr), std::end(ipaddr), p);
490 forwarded_for_ = StringRef{buf.base, p};
492 // family == AF_INET or family == AF_UNIX
493 forwarded_for_ = ipaddr;
497 void ClientHandler::setup_upstream_io_callback() {
499 conn_.prepare_server_handshake();
500 read_ = write_ = &ClientHandler::tls_handshake;
501 on_read_ = &ClientHandler::upstream_noop;
502 on_write_ = &ClientHandler::upstream_write;
504 // For non-TLS version, first create HttpsUpstream. It may be
505 // upgraded to HTTP/2 through HTTP Upgrade or direct HTTP/2
507 upstream_ = std::make_unique<HttpsUpstream>(this);
508 alpn_ = StringRef::from_lit("http/1.1");
509 read_ = &ClientHandler::read_clear;
510 write_ = &ClientHandler::write_clear;
511 on_read_ = &ClientHandler::upstream_http1_connhd_read;
512 on_write_ = &ClientHandler::upstream_noop;
517 void ClientHandler::setup_http3_upstream(
518 std::unique_ptr<Http3Upstream> &&upstream) {
519 upstream_ = std::move(upstream);
520 write_ = &ClientHandler::write_quic;
522 auto config = get_config();
524 reset_upstream_read_timeout(config->conn.upstream.timeout.http3_read);
526 #endif // ENABLE_HTTP3
528 ClientHandler::~ClientHandler() {
529 if (LOG_ENABLED(INFO)) {
530 CLOG(INFO, this) << "Deleting";
534 upstream_->on_handler_delete();
537 auto worker_stat = worker_->get_worker_stat();
538 --worker_stat->num_connections;
540 if (worker_stat->num_connections == 0) {
541 worker_->schedule_clear_mcpool();
544 ev_timer_stop(conn_.loop, &reneg_shutdown_timer_);
546 // TODO If backend is http/2, and it is in CONNECTED state, signal
547 // it and make it loopbreak when output is zero.
548 if (worker_->get_graceful_shutdown() && worker_stat->num_connections == 0 &&
549 worker_stat->num_close_waits == 0) {
550 ev_break(conn_.loop);
553 if (LOG_ENABLED(INFO)) {
554 CLOG(INFO, this) << "Deleted";
558 Upstream *ClientHandler::get_upstream() { return upstream_.get(); }
560 struct ev_loop *ClientHandler::get_loop() const {
564 void ClientHandler::reset_upstream_read_timeout(ev_tstamp t) {
566 if (ev_is_active(&conn_.rt)) {
567 ev_timer_again(conn_.loop, &conn_.rt);
571 void ClientHandler::reset_upstream_write_timeout(ev_tstamp t) {
573 if (ev_is_active(&conn_.wt)) {
574 ev_timer_again(conn_.loop, &conn_.wt);
578 void ClientHandler::repeat_read_timer() {
579 ev_timer_again(conn_.loop, &conn_.rt);
582 void ClientHandler::stop_read_timer() { ev_timer_stop(conn_.loop, &conn_.rt); }
584 int ClientHandler::validate_next_proto() {
585 const unsigned char *next_proto = nullptr;
586 unsigned int next_proto_len = 0;
588 // First set callback for catch all cases
589 on_read_ = &ClientHandler::upstream_read;
591 #ifndef OPENSSL_NO_NEXTPROTONEG
592 SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len);
593 #endif // !OPENSSL_NO_NEXTPROTONEG
594 #if OPENSSL_VERSION_NUMBER >= 0x10002000L
595 if (next_proto == nullptr) {
596 SSL_get0_alpn_selected(conn_.tls.ssl, &next_proto, &next_proto_len);
598 #endif // OPENSSL_VERSION_NUMBER >= 0x10002000L
603 proto = StringRef{next_proto, next_proto_len};
605 if (LOG_ENABLED(INFO)) {
606 CLOG(INFO, this) << "The negotiated next protocol: " << proto;
609 if (LOG_ENABLED(INFO)) {
610 CLOG(INFO, this) << "No protocol negotiated. Fallback to HTTP/1.1";
613 proto = StringRef::from_lit("http/1.1");
616 if (!tls::in_proto_list(get_config()->tls.npn_list, proto)) {
617 if (LOG_ENABLED(INFO)) {
618 CLOG(INFO, this) << "The negotiated protocol is not supported: " << proto;
623 if (util::check_h2_is_selected(proto)) {
624 on_read_ = &ClientHandler::upstream_http2_connhd_read;
626 auto http2_upstream = std::make_unique<Http2Upstream>(this);
628 upstream_ = std::move(http2_upstream);
629 alpn_ = make_string_ref(balloc_, proto);
631 // At this point, input buffer is already filled with some bytes.
632 // The read callback is not called until new data come. So consume
633 // input buffer here.
634 if (on_read() != 0) {
641 if (proto == StringRef::from_lit("http/1.1")) {
642 upstream_ = std::make_unique<HttpsUpstream>(this);
643 alpn_ = StringRef::from_lit("http/1.1");
645 // At this point, input buffer is already filled with some bytes.
646 // The read callback is not called until new data come. So consume
647 // input buffer here.
648 if (on_read() != 0) {
654 if (LOG_ENABLED(INFO)) {
655 CLOG(INFO, this) << "The negotiated protocol is not supported";
660 int ClientHandler::do_read() { return read_(*this); }
661 int ClientHandler::do_write() { return write_(*this); }
663 int ClientHandler::on_read() {
664 if (rb_.chunk_avail()) {
665 auto rv = on_read_(*this);
670 conn_.handle_tls_pending_read();
673 int ClientHandler::on_write() { return on_write_(*this); }
675 const StringRef &ClientHandler::get_ipaddr() const { return ipaddr_; }
677 bool ClientHandler::get_should_close_after_write() const {
678 return should_close_after_write_;
681 void ClientHandler::set_should_close_after_write(bool f) {
682 should_close_after_write_ = f;
685 void ClientHandler::pool_downstream_connection(
686 std::unique_ptr<DownstreamConnection> dconn) {
687 if (!dconn->poolable()) {
691 dconn->set_client_handler(nullptr);
693 auto &group = dconn->get_downstream_addr_group();
695 if (LOG_ENABLED(INFO)) {
696 CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get()
697 << " in group " << group;
700 auto addr = dconn->get_addr();
701 auto &dconn_pool = addr->dconn_pool;
702 dconn_pool->add_downstream_connection(std::move(dconn));
706 // Computes 32bits hash for session affinity for IP address |ip|.
707 uint32_t compute_affinity_from_ip(const StringRef &ip) {
709 std::array<uint8_t, 32> buf;
711 rv = util::sha256(buf.data(), ip);
713 // Not sure when sha256 failed. Just fall back to another
715 return util::hash32(ip);
718 return (static_cast<uint32_t>(buf[0]) << 24) |
719 (static_cast<uint32_t>(buf[1]) << 16) |
720 (static_cast<uint32_t>(buf[2]) << 8) | static_cast<uint32_t>(buf[3]);
724 Http2Session *ClientHandler::get_http2_session(
725 const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr) {
726 auto &shared_addr = group->shared_addr;
728 if (LOG_ENABLED(INFO)) {
729 CLOG(INFO, this) << "Selected DownstreamAddr=" << addr
730 << ", index=" << (addr - shared_addr->addrs.data());
733 for (auto session = addr->http2_extra_freelist.head; session;) {
734 auto next = session->dlnext;
736 if (session->max_concurrency_reached(0)) {
737 if (LOG_ENABLED(INFO)) {
739 << "Maximum streams have been reached for Http2Session(" << session
743 session->remove_from_freelist();
749 if (LOG_ENABLED(INFO)) {
750 CLOG(INFO, this) << "Use Http2Session " << session
751 << " from http2_extra_freelist";
754 if (session->max_concurrency_reached(1)) {
755 if (LOG_ENABLED(INFO)) {
756 CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
760 session->remove_from_freelist();
765 auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(),
766 worker_, group, addr);
768 if (LOG_ENABLED(INFO)) {
769 CLOG(INFO, this) << "Create new Http2Session " << session;
772 session->add_to_extra_freelist();
777 uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream,
778 const StringRef &cookie_name) {
779 auto h = downstream->find_affinity_cookie(cookie_name);
784 auto d = std::uniform_int_distribution<uint32_t>(
785 1, std::numeric_limits<uint32_t>::max());
786 auto rh = d(worker_->get_randgen());
787 h = util::hash32(StringRef{reinterpret_cast<uint8_t *>(&rh),
788 reinterpret_cast<uint8_t *>(&rh) + sizeof(rh)});
790 downstream->renew_affinity_cookie(h);
796 void reschedule_addr(
797 std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
798 DownstreamAddrEntryGreater> &pq,
799 DownstreamAddr *addr) {
800 auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + addr->pending_penalty;
801 addr->cycle += penalty / addr->weight;
802 addr->pending_penalty = penalty % addr->weight;
804 pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
811 std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
812 WeightGroupEntryGreater> &pq,
814 auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + wg->pending_penalty;
815 wg->cycle += penalty / wg->weight;
816 wg->pending_penalty = penalty % wg->weight;
818 pq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
823 DownstreamAddr *ClientHandler::get_downstream_addr(int &err,
824 DownstreamAddrGroup *group,
825 Downstream *downstream) {
828 switch (faddr_->alt_mode) {
829 case UpstreamAltMode::API:
830 case UpstreamAltMode::HEALTHMON:
836 auto &shared_addr = group->shared_addr;
838 if (shared_addr->affinity.type != SessionAffinity::NONE) {
840 switch (shared_addr->affinity.type) {
841 case SessionAffinity::IP:
842 if (!affinity_hash_computed_) {
843 affinity_hash_ = compute_affinity_from_ip(ipaddr_);
844 affinity_hash_computed_ = true;
846 hash = affinity_hash_;
848 case SessionAffinity::COOKIE:
849 hash = get_affinity_cookie(downstream, shared_addr->affinity.cookie.name);
855 const auto &affinity_hash = shared_addr->affinity_hash;
857 auto it = std::lower_bound(
858 std::begin(affinity_hash), std::end(affinity_hash), hash,
859 [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
861 if (it == std::end(affinity_hash)) {
862 it = std::begin(affinity_hash);
866 static_cast<size_t>(std::distance(std::begin(affinity_hash), it));
867 auto idx = (*it).idx;
868 auto addr = &shared_addr->addrs[idx];
870 if (addr->connect_blocker->blocked()) {
872 for (i = aff_idx + 1; i != aff_idx; ++i) {
873 if (i == shared_addr->affinity_hash.size()) {
876 addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx];
877 if (addr->connect_blocker->blocked()) {
892 auto &wgpq = shared_addr->pq;
896 CLOG(INFO, this) << "No working downstream address found";
901 auto wg = wgpq.top().wg;
906 if (wg->pq.empty()) {
910 auto addr = wg->pq.top().addr;
912 addr->queued = false;
914 if (addr->connect_blocker->blocked()) {
918 reschedule_addr(wg->pq, addr);
919 reschedule_wg(wgpq, wg);
926 std::unique_ptr<DownstreamConnection>
927 ClientHandler::get_downstream_connection(int &err, Downstream *downstream) {
929 auto &downstreamconf = *worker_->get_downstream_config();
930 auto &routerconf = downstreamconf.router;
932 auto catch_all = downstreamconf.addr_group_catch_all;
933 auto &groups = worker_->get_downstream_addr_groups();
935 auto &req = downstream->request();
939 switch (faddr_->alt_mode) {
940 case UpstreamAltMode::API: {
941 auto dconn = std::make_unique<APIDownstreamConnection>(worker_);
942 dconn->set_client_handler(this);
945 case UpstreamAltMode::HEALTHMON: {
946 auto dconn = std::make_unique<HealthMonitorDownstreamConnection>();
947 dconn->set_client_handler(this);
954 auto &balloc = downstream->get_block_allocator();
956 StringRef authority, path;
958 if (req.forwarded_once) {
959 if (groups.size() != 1) {
960 authority = req.orig_authority;
961 path = req.orig_path;
964 if (faddr_->sni_fwd) {
966 } else if (!req.authority.empty()) {
967 authority = req.authority;
969 auto h = req.fs.header(http2::HD_HOST);
971 authority = h->value;
975 // CONNECT method does not have path. But we requires path in
976 // host-path mapping. As workaround, we assume that path is
978 if (!req.regular_connect_method()) {
982 // Cache the authority and path used for the first-time backend
983 // selection because per-pattern mruby script can change them.
984 req.orig_authority = authority;
985 req.orig_path = path;
986 req.forwarded_once = true;
989 // Fast path. If we have one group, it must be catch-all group.
990 if (groups.size() == 1) {
993 group_idx = match_downstream_addr_group(routerconf, authority, path, groups,
997 if (LOG_ENABLED(INFO)) {
998 CLOG(INFO, this) << "Downstream address group_idx: " << group_idx;
1001 if (groups[group_idx]->shared_addr->redirect_if_not_tls && !conn_.tls.ssl) {
1002 if (LOG_ENABLED(INFO)) {
1003 CLOG(INFO, this) << "Downstream address group " << group_idx
1004 << " requires frontend TLS connection.";
1006 err = SHRPX_ERR_TLS_REQUIRED;
1010 auto &group = groups[group_idx];
1012 if (group->shared_addr->dnf) {
1013 auto dconn = std::make_unique<NullDownstreamConnection>(group);
1014 dconn->set_client_handler(this);
1018 auto addr = get_downstream_addr(err, group.get(), downstream);
1019 if (addr == nullptr) {
1023 if (addr->proto == Proto::HTTP1) {
1024 auto dconn = addr->dconn_pool->pop_downstream_connection();
1026 dconn->set_client_handler(this);
1030 if (worker_->get_connect_blocker()->blocked()) {
1031 if (LOG_ENABLED(INFO)) {
1033 << "Worker wide backend connection was blocked temporarily";
1038 if (LOG_ENABLED(INFO)) {
1039 CLOG(INFO, this) << "Downstream connection pool is empty."
1040 << " Create new one";
1043 dconn = std::make_unique<HttpDownstreamConnection>(group, addr, conn_.loop,
1045 dconn->set_client_handler(this);
1049 if (LOG_ENABLED(INFO)) {
1050 CLOG(INFO, this) << "Downstream connection pool is empty."
1051 << " Create new one";
1054 auto http2session = get_http2_session(group, addr);
1055 auto dconn = std::make_unique<Http2DownstreamConnection>(http2session);
1056 dconn->set_client_handler(this);
1060 MemchunkPool *ClientHandler::get_mcpool() { return worker_->get_mcpool(); }
1062 SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; }
1064 void ClientHandler::direct_http2_upgrade() {
1065 upstream_ = std::make_unique<Http2Upstream>(this);
1066 alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1067 on_read_ = &ClientHandler::upstream_read;
1068 write_ = &ClientHandler::write_clear;
1071 int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
1072 auto upstream = std::make_unique<Http2Upstream>(this);
1074 auto output = upstream->get_response_buf();
1076 // We might have written non-final header in response_buf, in this
1077 // case, response_state is still INITIAL. If this non-final header
1078 // and upgrade header fit in output buffer, do upgrade. Otherwise,
1079 // to avoid to send this non-final header as response body in HTTP/2
1080 // upstream, fail upgrade.
1081 auto downstream = http->get_downstream();
1082 auto input = downstream->get_response_buf();
1084 if (upstream->upgrade_upstream(http) != 0) {
1087 // http pointer is now owned by upstream.
1088 upstream_.release();
1089 // TODO We might get other version id in HTTP2-settings, if we
1090 // support aliasing for h2, but we just use library default for now.
1091 alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1092 on_read_ = &ClientHandler::upstream_http2_connhd_read;
1093 write_ = &ClientHandler::write_clear;
1095 input->remove(*output, input->rleft());
1097 constexpr auto res =
1098 StringRef::from_lit("HTTP/1.1 101 Switching Protocols\r\n"
1099 "Connection: Upgrade\r\n"
1100 "Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
1103 output->append(res);
1104 upstream_ = std::move(upstream);
1110 bool ClientHandler::get_http2_upgrade_allowed() const { return !conn_.tls.ssl; }
1112 StringRef ClientHandler::get_upstream_scheme() const {
1113 if (conn_.tls.ssl) {
1114 return StringRef::from_lit("https");
1116 return StringRef::from_lit("http");
1120 void ClientHandler::start_immediate_shutdown() {
1121 ev_timer_start(conn_.loop, &reneg_shutdown_timer_);
1124 void ClientHandler::write_accesslog(Downstream *downstream) {
1125 auto &req = downstream->request();
1127 auto config = get_config();
1130 auto lgconf = log_config();
1131 lgconf->update_tstamp(std::chrono::system_clock::now());
1132 req.tstamp = lgconf->tstamp;
1136 config->logging.access.format,
1143 std::chrono::high_resolution_clock::now(), // request_end_time
1150 ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; }
1152 void ClientHandler::signal_write() { conn_.wlimit.startw(); }
1154 RateLimit *ClientHandler::get_rlimit() { return &conn_.rlimit; }
1155 RateLimit *ClientHandler::get_wlimit() { return &conn_.wlimit; }
1157 ev_io *ClientHandler::get_wev() { return &conn_.wev; }
1159 Worker *ClientHandler::get_worker() const { return worker_; }
1162 ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) {
1171 if (p + 1 != last && util::is_digit(*(p + 1))) {
1177 for (; p != last && util::is_digit(*p); ++p) {
1190 int ClientHandler::on_proxy_protocol_finish() {
1191 if (conn_.tls.ssl) {
1192 conn_.tls.rbuf.append(rb_.pos(), rb_.rleft());
1196 setup_upstream_io_callback();
1198 // Run on_read to process data left in buffer since they are not
1200 if (on_read() != 0) {
1208 // PROXY-protocol v2 header signature
1209 constexpr uint8_t PROXY_PROTO_V2_SIG[] =
1210 "\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A";
1212 // PROXY-protocol v2 header length
1213 constexpr size_t PROXY_PROTO_V2_HDLEN =
1214 str_size(PROXY_PROTO_V2_SIG) + /* ver_cmd(1) + fam(1) + len(2) = */ 4;
1217 // http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt
1218 int ClientHandler::proxy_protocol_read() {
1219 if (LOG_ENABLED(INFO)) {
1220 CLOG(INFO, this) << "PROXY-protocol: Started";
1223 auto first = rb_.pos();
1225 if (rb_.rleft() >= PROXY_PROTO_V2_HDLEN &&
1226 (*(first + str_size(PROXY_PROTO_V2_SIG)) & 0xf0) == 0x20) {
1227 if (LOG_ENABLED(INFO)) {
1228 CLOG(INFO, this) << "PROXY-protocol: Detected v2 header signature";
1230 return proxy_protocol_v2_read();
1233 // NULL character really destroys functions which expects NULL
1234 // terminated string. We won't expect it in PROXY protocol line, so
1236 auto chrs = std::array<char, 2>{'\n', '\0'};
1238 constexpr size_t MAX_PROXY_LINELEN = 107;
1240 auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft());
1243 std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs));
1245 if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') {
1246 if (LOG_ENABLED(INFO)) {
1247 CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found";
1254 constexpr auto HEADER = StringRef::from_lit("PROXY ");
1256 if (static_cast<size_t>(end - rb_.pos()) < HEADER.size()) {
1257 if (LOG_ENABLED(INFO)) {
1258 CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found";
1263 if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) {
1264 if (LOG_ENABLED(INFO)) {
1265 CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID";
1270 rb_.drain(HEADER.size());
1274 if (rb_.pos()[0] == 'T') {
1275 if (end - rb_.pos() < 5) {
1276 if (LOG_ENABLED(INFO)) {
1277 CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1282 if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') {
1283 if (LOG_ENABLED(INFO)) {
1284 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1289 switch (rb_.pos()[3]) {
1297 if (LOG_ENABLED(INFO)) {
1298 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1305 if (end - rb_.pos() < 7) {
1306 if (LOG_ENABLED(INFO)) {
1307 CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1311 if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) {
1312 if (LOG_ENABLED(INFO)) {
1313 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1318 rb_.drain(end + 2 - rb_.pos());
1320 return on_proxy_protocol_finish();
1324 auto token_end = std::find(rb_.pos(), end, ' ');
1325 if (token_end == end) {
1326 if (LOG_ENABLED(INFO)) {
1327 CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found";
1333 if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1334 if (LOG_ENABLED(INFO)) {
1335 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address";
1340 auto src_addr = rb_.pos();
1341 auto src_addrlen = token_end - rb_.pos();
1343 rb_.drain(token_end - rb_.pos() + 1);
1345 // destination address
1346 token_end = std::find(rb_.pos(), end, ' ');
1347 if (token_end == end) {
1348 if (LOG_ENABLED(INFO)) {
1349 CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found";
1355 if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1356 if (LOG_ENABLED(INFO)) {
1357 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address";
1362 // Currently we don't use destination address
1364 rb_.drain(token_end - rb_.pos() + 1);
1367 auto n = parse_proxy_line_port(rb_.pos(), end);
1368 if (n <= 0 || *(rb_.pos() + n) != ' ') {
1369 if (LOG_ENABLED(INFO)) {
1370 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port";
1375 rb_.pos()[n] = '\0';
1376 auto src_port = rb_.pos();
1377 auto src_portlen = n;
1382 n = parse_proxy_line_port(rb_.pos(), end);
1383 if (n <= 0 || rb_.pos() + n != end) {
1384 if (LOG_ENABLED(INFO)) {
1385 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port";
1390 // Currently we don't use destination port
1392 rb_.drain(end + 2 - rb_.pos());
1395 make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen});
1396 port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen});
1398 if (LOG_ENABLED(INFO)) {
1399 CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first)
1403 auto config = get_config();
1404 auto &fwdconf = config->http.forwarded;
1406 if ((fwdconf.params & FORWARDED_FOR) &&
1407 fwdconf.for_node_type == ForwardedNode::IP) {
1408 init_forwarded_for(family, ipaddr_);
1411 return on_proxy_protocol_finish();
1414 int ClientHandler::proxy_protocol_v2_read() {
1415 // Assume that first str_size(PROXY_PROTO_V2_SIG) octets match v2
1416 // protocol signature and followed by the bytes which indicates v2.
1417 assert(rb_.rleft() >= PROXY_PROTO_V2_HDLEN);
1419 auto p = rb_.pos() + str_size(PROXY_PROTO_V2_SIG);
1421 assert(((*p) & 0xf0) == 0x20);
1423 enum { LOCAL, PROXY } cmd;
1425 auto cmd_bits = (*p++) & 0xf;
1434 if (LOG_ENABLED(INFO)) {
1435 CLOG(INFO, this) << "PROXY-protocol-v2: Unknown command " << log::hex
1443 memcpy(&len, p, sizeof(len));
1448 if (LOG_ENABLED(INFO)) {
1449 CLOG(INFO, this) << "PROXY-protocol-v2: Detected family=" << log::hex << fam
1450 << ", len=" << log::dec << len;
1453 if (rb_.last() - p < len) {
1454 if (LOG_ENABLED(INFO)) {
1456 << "PROXY-protocol-v2: Prematurely truncated header block; require "
1457 << len << " bytes, " << rb_.last() - p << " bytes left";
1463 std::array<char, std::max(INET_ADDRSTRLEN, INET6_ADDRSTRLEN)> src_addr,
1471 if (LOG_ENABLED(INFO)) {
1472 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET addresses";
1482 if (LOG_ENABLED(INFO)) {
1483 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET6 addresses";
1493 if (LOG_ENABLED(INFO)) {
1494 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_UNIX addresses";
1500 // UNSPEC and UNIX are just ignored.
1501 if (LOG_ENABLED(INFO)) {
1502 CLOG(INFO, this) << "PROXY-protocol-v2: Ignore combination of address "
1503 "family and protocol "
1506 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1507 return on_proxy_protocol_finish();
1510 if (LOG_ENABLED(INFO)) {
1511 CLOG(INFO, this) << "PROXY-protocol-v2: Unknown combination of address "
1512 "family and protocol "
1519 if (LOG_ENABLED(INFO)) {
1520 CLOG(INFO, this) << "PROXY-protocol-v2: Ignore non-PROXY command";
1522 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1523 return on_proxy_protocol_finish();
1526 if (inet_ntop(family, p, src_addr.data(), src_addr.size()) == nullptr) {
1527 if (LOG_ENABLED(INFO)) {
1528 CLOG(INFO, this) << "PROXY-protocol-v2: Unable to parse source address";
1535 if (inet_ntop(family, p, dst_addr.data(), dst_addr.size()) == nullptr) {
1536 if (LOG_ENABLED(INFO)) {
1538 << "PROXY-protocol-v2: Unable to parse destination address";
1547 memcpy(&src_port, p, sizeof(src_port));
1548 src_port = ntohs(src_port);
1550 // We don't use destination port.
1553 ipaddr_ = make_string_ref(balloc_, StringRef{src_addr.data()});
1554 port_ = util::make_string_ref_uint(balloc_, src_port);
1556 if (LOG_ENABLED(INFO)) {
1557 CLOG(INFO, this) << "PROXY-protocol-v2: Finished reading proxy addresses, "
1558 << p - rb_.pos() << " bytes read, "
1559 << PROXY_PROTO_V2_HDLEN + len - (p - rb_.pos())
1563 auto config = get_config();
1564 auto &fwdconf = config->http.forwarded;
1566 if ((fwdconf.params & FORWARDED_FOR) &&
1567 fwdconf.for_node_type == ForwardedNode::IP) {
1568 init_forwarded_for(family, ipaddr_);
1571 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1572 return on_proxy_protocol_finish();
1575 StringRef ClientHandler::get_forwarded_by() const {
1576 auto &fwdconf = get_config()->http.forwarded;
1578 if (fwdconf.by_node_type == ForwardedNode::OBFUSCATED) {
1579 return fwdconf.by_obfuscated;
1582 return faddr_->hostport;
1585 StringRef ClientHandler::get_forwarded_for() const { return forwarded_for_; }
1587 const UpstreamAddr *ClientHandler::get_upstream_addr() const { return faddr_; }
1589 Connection *ClientHandler::get_connection() { return &conn_; };
1591 void ClientHandler::set_tls_sni(const StringRef &sni) {
1592 sni_ = make_string_ref(balloc_, sni);
1595 StringRef ClientHandler::get_tls_sni() const { return sni_; }
1597 StringRef ClientHandler::get_alpn() const { return alpn_; }
1599 BlockAllocator &ClientHandler::get_block_allocator() { return balloc_; }
1601 void ClientHandler::set_alpn_from_conn() {
1602 const unsigned char *alpn;
1603 unsigned int alpnlen;
1605 SSL_get0_alpn_selected(conn_.tls.ssl, &alpn, &alpnlen);
1607 alpn_ = make_string_ref(balloc_, StringRef{alpn, alpnlen});
1610 } // namespace shrpx