2 * nghttp2 - HTTP/2 C Library
4 * Copyright (c) 2012 Tatsuhiro Tsujikawa
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 #include "shrpx_client_handler.h"
29 #endif // HAVE_UNISTD_H
30 #ifdef HAVE_SYS_SOCKET_H
31 # include <sys/socket.h>
32 #endif // HAVE_SYS_SOCKET_H
35 #endif // HAVE_NETDB_H
39 #include "shrpx_upstream.h"
40 #include "shrpx_http2_upstream.h"
41 #include "shrpx_https_upstream.h"
42 #include "shrpx_config.h"
43 #include "shrpx_http_downstream_connection.h"
44 #include "shrpx_http2_downstream_connection.h"
45 #include "shrpx_tls.h"
46 #include "shrpx_worker.h"
47 #include "shrpx_downstream_connection_pool.h"
48 #include "shrpx_downstream.h"
49 #include "shrpx_http2_session.h"
50 #include "shrpx_connect_blocker.h"
51 #include "shrpx_api_downstream_connection.h"
52 #include "shrpx_health_monitor_downstream_connection.h"
53 #include "shrpx_log.h"
58 using namespace nghttp2;
63 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
64 auto conn = static_cast<Connection *>(w->data);
65 auto handler = static_cast<ClientHandler *>(conn->data);
67 if (LOG_ENABLED(INFO)) {
68 CLOG(INFO, handler) << "Time out";
76 void shutdowncb(struct ev_loop *loop, ev_timer *w, int revents) {
77 auto handler = static_cast<ClientHandler *>(w->data);
79 if (LOG_ENABLED(INFO)) {
80 CLOG(INFO, handler) << "Close connection due to TLS renegotiation";
88 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
89 auto conn = static_cast<Connection *>(w->data);
90 auto handler = static_cast<ClientHandler *>(conn->data);
92 if (handler->do_read() != 0) {
100 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
101 auto conn = static_cast<Connection *>(w->data);
102 auto handler = static_cast<ClientHandler *>(conn->data);
104 if (handler->do_write() != 0) {
111 int ClientHandler::noop() { return 0; }
113 int ClientHandler::read_clear() {
114 auto should_break = false;
117 if (rb_.rleft() && on_read() != 0) {
120 if (rb_.rleft() == 0) {
122 } else if (rb_.wleft() == 0) {
123 conn_.rlimit.stopw();
127 if (!ev_is_active(&conn_.rev) || should_break) {
131 auto nread = conn_.read_clear(rb_.last(), rb_.wleft());
134 if (rb_.rleft() == 0) {
149 int ClientHandler::write_clear() {
150 std::array<iovec, 2> iov;
153 if (on_write() != 0) {
157 auto iovcnt = upstream_->response_riovec(iov.data(), iov.size());
162 auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
171 upstream_->response_drain(nwrite);
174 conn_.wlimit.stopw();
175 ev_timer_stop(conn_.loop, &conn_.wt);
180 int ClientHandler::tls_handshake() {
181 ev_timer_again(conn_.loop, &conn_.rt);
185 auto rv = conn_.tls_handshake();
187 if (rv == SHRPX_ERR_INPROGRESS) {
195 if (LOG_ENABLED(INFO)) {
196 CLOG(INFO, this) << "SSL/TLS handshake completed";
199 if (validate_next_proto() != 0) {
203 read_ = &ClientHandler::read_tls;
204 write_ = &ClientHandler::write_tls;
209 int ClientHandler::read_tls() {
210 auto should_break = false;
217 // we should process buffered data first before we read EOF.
218 if (rb_.rleft() && on_read() != 0) {
221 if (rb_.rleft() == 0) {
223 } else if (rb_.wleft() == 0) {
224 conn_.rlimit.stopw();
228 if (!ev_is_active(&conn_.rev) || should_break) {
232 auto nread = conn_.read_tls(rb_.last(), rb_.wleft());
235 if (rb_.rleft() == 0) {
250 int ClientHandler::write_tls() {
255 if (on_write() != 0) {
259 auto iovcnt = upstream_->response_riovec(&iov, 1);
261 conn_.start_tls_write_idle();
263 conn_.wlimit.stopw();
264 ev_timer_stop(conn_.loop, &conn_.wt);
270 auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
279 upstream_->response_drain(nwrite);
281 iovcnt = upstream_->response_riovec(&iov, 1);
288 int ClientHandler::upstream_noop() { return 0; }
290 int ClientHandler::upstream_read() {
292 if (upstream_->on_read() != 0) {
298 int ClientHandler::upstream_write() {
300 if (upstream_->on_write() != 0) {
304 if (get_should_close_after_write() && upstream_->response_empty()) {
311 int ClientHandler::upstream_http2_connhd_read() {
312 auto nread = std::min(left_connhd_len_, rb_.rleft());
313 if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
314 rb_.pos(), nread) != 0) {
315 // There is no downgrade path here. Just drop the connection.
316 if (LOG_ENABLED(INFO)) {
317 CLOG(INFO, this) << "invalid client connection header";
323 left_connhd_len_ -= nread;
325 conn_.rlimit.startw();
327 if (left_connhd_len_ == 0) {
328 on_read_ = &ClientHandler::upstream_read;
329 // Run on_read to process data left in buffer since they are not
331 if (on_read() != 0) {
340 int ClientHandler::upstream_http1_connhd_read() {
341 auto nread = std::min(left_connhd_len_, rb_.rleft());
342 if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_],
343 rb_.pos(), nread) != 0) {
344 if (LOG_ENABLED(INFO)) {
345 CLOG(INFO, this) << "This is HTTP/1.1 connection, "
346 << "but may be upgraded to HTTP/2 later.";
349 // Reset header length for later HTTP/2 upgrade
350 left_connhd_len_ = NGHTTP2_CLIENT_MAGIC_LEN;
351 on_read_ = &ClientHandler::upstream_read;
352 on_write_ = &ClientHandler::upstream_write;
354 if (on_read() != 0) {
361 left_connhd_len_ -= nread;
363 conn_.rlimit.startw();
365 if (left_connhd_len_ == 0) {
366 if (LOG_ENABLED(INFO)) {
367 CLOG(INFO, this) << "direct HTTP/2 connection";
370 direct_http2_upgrade();
371 on_read_ = &ClientHandler::upstream_read;
372 on_write_ = &ClientHandler::upstream_write;
374 // Run on_read to process data left in buffer since they are not
376 if (on_read() != 0) {
386 ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
387 const StringRef &ipaddr, const StringRef &port,
388 int family, const UpstreamAddr *faddr)
389 : // We use balloc_ for TLS session ID (64), ipaddr (IPv6) (39),
390 // port (5), forwarded-for (IPv6) (41), alpn (5), proxyproto
391 // ipaddr (15), proxyproto port (5), sni (32, estimated). we
392 // need terminal NULL byte for each. We also require 8 bytes
393 // header for each allocation. We align at 16 bytes boundary,
394 // so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 +
395 // 32 + 8 + 8 * 8 = 328.
397 rb_(worker->get_mcpool()),
398 conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(),
399 get_config()->conn.upstream.timeout.write,
400 get_config()->conn.upstream.timeout.read,
401 get_config()->conn.upstream.ratelimit.write,
402 get_config()->conn.upstream.ratelimit.read, writecb, readcb,
403 timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
404 get_config()->tls.dyn_rec.idle_timeout, Proto::NONE),
405 ipaddr_(make_string_ref(balloc_, ipaddr)),
406 port_(make_string_ref(balloc_, port)),
409 left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
411 should_close_after_write_(false),
412 affinity_hash_computed_(false) {
414 ++worker_->get_worker_stat()->num_connections;
416 ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.);
418 reneg_shutdown_timer_.data = this;
420 conn_.rlimit.startw();
421 ev_timer_again(conn_.loop, &conn_.rt);
423 auto config = get_config();
425 if (faddr_->accept_proxy_protocol ||
426 config->conn.upstream.accept_proxy_protocol) {
427 read_ = &ClientHandler::read_clear;
428 write_ = &ClientHandler::noop;
429 on_read_ = &ClientHandler::proxy_protocol_read;
430 on_write_ = &ClientHandler::upstream_noop;
432 setup_upstream_io_callback();
435 auto &fwdconf = config->http.forwarded;
437 if (fwdconf.params & FORWARDED_FOR) {
438 if (fwdconf.for_node_type == ForwardedNode::OBFUSCATED) {
440 auto len = SHRPX_OBFUSCATED_NODE_LENGTH + 1;
441 // 1 for terminating NUL.
442 auto buf = make_byte_ref(balloc_, len + 1);
445 p = util::random_alpha_digit(p, p + SHRPX_OBFUSCATED_NODE_LENGTH,
446 worker_->get_randgen());
449 forwarded_for_ = StringRef{buf.base, p};
451 init_forwarded_for(family, ipaddr_);
456 void ClientHandler::init_forwarded_for(int family, const StringRef &ipaddr) {
457 if (family == AF_INET6) {
459 auto len = 2 + ipaddr.size();
460 // 1 for terminating NUL.
461 auto buf = make_byte_ref(balloc_, len + 1);
464 p = std::copy(std::begin(ipaddr), std::end(ipaddr), p);
468 forwarded_for_ = StringRef{buf.base, p};
470 // family == AF_INET or family == AF_UNIX
471 forwarded_for_ = ipaddr;
475 void ClientHandler::setup_upstream_io_callback() {
477 conn_.prepare_server_handshake();
478 read_ = write_ = &ClientHandler::tls_handshake;
479 on_read_ = &ClientHandler::upstream_noop;
480 on_write_ = &ClientHandler::upstream_write;
482 // For non-TLS version, first create HttpsUpstream. It may be
483 // upgraded to HTTP/2 through HTTP Upgrade or direct HTTP/2
485 upstream_ = std::make_unique<HttpsUpstream>(this);
486 alpn_ = StringRef::from_lit("http/1.1");
487 read_ = &ClientHandler::read_clear;
488 write_ = &ClientHandler::write_clear;
489 on_read_ = &ClientHandler::upstream_http1_connhd_read;
490 on_write_ = &ClientHandler::upstream_noop;
494 ClientHandler::~ClientHandler() {
495 if (LOG_ENABLED(INFO)) {
496 CLOG(INFO, this) << "Deleting";
500 upstream_->on_handler_delete();
503 auto worker_stat = worker_->get_worker_stat();
504 --worker_stat->num_connections;
506 if (worker_stat->num_connections == 0) {
507 worker_->schedule_clear_mcpool();
510 ev_timer_stop(conn_.loop, &reneg_shutdown_timer_);
512 // TODO If backend is http/2, and it is in CONNECTED state, signal
513 // it and make it loopbreak when output is zero.
514 if (worker_->get_graceful_shutdown() && worker_stat->num_connections == 0) {
515 ev_break(conn_.loop);
518 if (LOG_ENABLED(INFO)) {
519 CLOG(INFO, this) << "Deleted";
523 Upstream *ClientHandler::get_upstream() { return upstream_.get(); }
525 struct ev_loop *ClientHandler::get_loop() const {
529 void ClientHandler::reset_upstream_read_timeout(ev_tstamp t) {
531 if (ev_is_active(&conn_.rt)) {
532 ev_timer_again(conn_.loop, &conn_.rt);
536 void ClientHandler::reset_upstream_write_timeout(ev_tstamp t) {
538 if (ev_is_active(&conn_.wt)) {
539 ev_timer_again(conn_.loop, &conn_.wt);
543 void ClientHandler::repeat_read_timer() {
544 ev_timer_again(conn_.loop, &conn_.rt);
547 void ClientHandler::stop_read_timer() { ev_timer_stop(conn_.loop, &conn_.rt); }
549 int ClientHandler::validate_next_proto() {
550 const unsigned char *next_proto = nullptr;
551 unsigned int next_proto_len = 0;
553 // First set callback for catch all cases
554 on_read_ = &ClientHandler::upstream_read;
556 #ifndef OPENSSL_NO_NEXTPROTONEG
557 SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len);
558 #endif // !OPENSSL_NO_NEXTPROTONEG
559 #if OPENSSL_VERSION_NUMBER >= 0x10002000L
560 if (next_proto == nullptr) {
561 SSL_get0_alpn_selected(conn_.tls.ssl, &next_proto, &next_proto_len);
563 #endif // OPENSSL_VERSION_NUMBER >= 0x10002000L
568 proto = StringRef{next_proto, next_proto_len};
570 if (LOG_ENABLED(INFO)) {
571 CLOG(INFO, this) << "The negotiated next protocol: " << proto;
574 if (LOG_ENABLED(INFO)) {
575 CLOG(INFO, this) << "No protocol negotiated. Fallback to HTTP/1.1";
578 proto = StringRef::from_lit("http/1.1");
581 if (!tls::in_proto_list(get_config()->tls.npn_list, proto)) {
582 if (LOG_ENABLED(INFO)) {
583 CLOG(INFO, this) << "The negotiated protocol is not supported: " << proto;
588 if (util::check_h2_is_selected(proto)) {
589 on_read_ = &ClientHandler::upstream_http2_connhd_read;
591 auto http2_upstream = std::make_unique<Http2Upstream>(this);
593 upstream_ = std::move(http2_upstream);
594 alpn_ = make_string_ref(balloc_, proto);
596 // At this point, input buffer is already filled with some bytes.
597 // The read callback is not called until new data come. So consume
598 // input buffer here.
599 if (on_read() != 0) {
606 if (proto == StringRef::from_lit("http/1.1")) {
607 upstream_ = std::make_unique<HttpsUpstream>(this);
608 alpn_ = StringRef::from_lit("http/1.1");
610 // At this point, input buffer is already filled with some bytes.
611 // The read callback is not called until new data come. So consume
612 // input buffer here.
613 if (on_read() != 0) {
619 if (LOG_ENABLED(INFO)) {
620 CLOG(INFO, this) << "The negotiated protocol is not supported";
625 int ClientHandler::do_read() { return read_(*this); }
626 int ClientHandler::do_write() { return write_(*this); }
628 int ClientHandler::on_read() {
629 if (rb_.chunk_avail()) {
630 auto rv = on_read_(*this);
635 conn_.handle_tls_pending_read();
638 int ClientHandler::on_write() { return on_write_(*this); }
640 const StringRef &ClientHandler::get_ipaddr() const { return ipaddr_; }
642 bool ClientHandler::get_should_close_after_write() const {
643 return should_close_after_write_;
646 void ClientHandler::set_should_close_after_write(bool f) {
647 should_close_after_write_ = f;
650 void ClientHandler::pool_downstream_connection(
651 std::unique_ptr<DownstreamConnection> dconn) {
652 if (!dconn->poolable()) {
656 dconn->set_client_handler(nullptr);
658 auto &group = dconn->get_downstream_addr_group();
660 if (LOG_ENABLED(INFO)) {
661 CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get()
662 << " in group " << group;
665 auto addr = dconn->get_addr();
666 auto &dconn_pool = addr->dconn_pool;
667 dconn_pool->add_downstream_connection(std::move(dconn));
671 // Computes 32bits hash for session affinity for IP address |ip|.
672 uint32_t compute_affinity_from_ip(const StringRef &ip) {
674 std::array<uint8_t, 32> buf;
676 rv = util::sha256(buf.data(), ip);
678 // Not sure when sha256 failed. Just fall back to another
680 return util::hash32(ip);
683 return (static_cast<uint32_t>(buf[0]) << 24) |
684 (static_cast<uint32_t>(buf[1]) << 16) |
685 (static_cast<uint32_t>(buf[2]) << 8) | static_cast<uint32_t>(buf[3]);
689 Http2Session *ClientHandler::get_http2_session(
690 const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr) {
691 auto &shared_addr = group->shared_addr;
693 if (LOG_ENABLED(INFO)) {
694 CLOG(INFO, this) << "Selected DownstreamAddr=" << addr
695 << ", index=" << (addr - shared_addr->addrs.data());
698 for (auto session = addr->http2_extra_freelist.head; session;) {
699 auto next = session->dlnext;
701 if (session->max_concurrency_reached(0)) {
702 if (LOG_ENABLED(INFO)) {
704 << "Maximum streams have been reached for Http2Session(" << session
708 session->remove_from_freelist();
714 if (LOG_ENABLED(INFO)) {
715 CLOG(INFO, this) << "Use Http2Session " << session
716 << " from http2_extra_freelist";
719 if (session->max_concurrency_reached(1)) {
720 if (LOG_ENABLED(INFO)) {
721 CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
725 session->remove_from_freelist();
730 auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(),
731 worker_, group, addr);
733 if (LOG_ENABLED(INFO)) {
734 CLOG(INFO, this) << "Create new Http2Session " << session;
737 session->add_to_extra_freelist();
742 uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream,
743 const StringRef &cookie_name) {
744 auto h = downstream->find_affinity_cookie(cookie_name);
749 auto d = std::uniform_int_distribution<uint32_t>(
750 1, std::numeric_limits<uint32_t>::max());
751 auto rh = d(worker_->get_randgen());
752 h = util::hash32(StringRef{reinterpret_cast<uint8_t *>(&rh),
753 reinterpret_cast<uint8_t *>(&rh) + sizeof(rh)});
755 downstream->renew_affinity_cookie(h);
761 void reschedule_addr(
762 std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
763 DownstreamAddrEntryGreater> &pq,
764 DownstreamAddr *addr) {
765 auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + addr->pending_penalty;
766 addr->cycle += penalty / addr->weight;
767 addr->pending_penalty = penalty % addr->weight;
769 pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
776 std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
777 WeightGroupEntryGreater> &pq,
779 auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + wg->pending_penalty;
780 wg->cycle += penalty / wg->weight;
781 wg->pending_penalty = penalty % wg->weight;
783 pq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
788 DownstreamAddr *ClientHandler::get_downstream_addr(int &err,
789 DownstreamAddrGroup *group,
790 Downstream *downstream) {
793 switch (faddr_->alt_mode) {
794 case UpstreamAltMode::API:
795 case UpstreamAltMode::HEALTHMON:
801 auto &shared_addr = group->shared_addr;
803 if (shared_addr->affinity.type != SessionAffinity::NONE) {
805 switch (shared_addr->affinity.type) {
806 case SessionAffinity::IP:
807 if (!affinity_hash_computed_) {
808 affinity_hash_ = compute_affinity_from_ip(ipaddr_);
809 affinity_hash_computed_ = true;
811 hash = affinity_hash_;
813 case SessionAffinity::COOKIE:
814 hash = get_affinity_cookie(downstream, shared_addr->affinity.cookie.name);
820 const auto &affinity_hash = shared_addr->affinity_hash;
822 auto it = std::lower_bound(
823 std::begin(affinity_hash), std::end(affinity_hash), hash,
824 [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
826 if (it == std::end(affinity_hash)) {
827 it = std::begin(affinity_hash);
831 static_cast<size_t>(std::distance(std::begin(affinity_hash), it));
832 auto idx = (*it).idx;
833 auto addr = &shared_addr->addrs[idx];
835 if (addr->connect_blocker->blocked()) {
837 for (i = aff_idx + 1; i != aff_idx; ++i) {
838 if (i == shared_addr->affinity_hash.size()) {
841 addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx];
842 if (addr->connect_blocker->blocked()) {
857 auto &wgpq = shared_addr->pq;
861 CLOG(INFO, this) << "No working downstream address found";
866 auto wg = wgpq.top().wg;
871 if (wg->pq.empty()) {
875 auto addr = wg->pq.top().addr;
877 addr->queued = false;
879 if (addr->connect_blocker->blocked()) {
883 reschedule_addr(wg->pq, addr);
884 reschedule_wg(wgpq, wg);
891 std::unique_ptr<DownstreamConnection>
892 ClientHandler::get_downstream_connection(int &err, Downstream *downstream) {
894 auto &downstreamconf = *worker_->get_downstream_config();
895 auto &routerconf = downstreamconf.router;
897 auto catch_all = downstreamconf.addr_group_catch_all;
898 auto &groups = worker_->get_downstream_addr_groups();
900 auto &req = downstream->request();
904 switch (faddr_->alt_mode) {
905 case UpstreamAltMode::API:
906 return std::make_unique<APIDownstreamConnection>(worker_);
907 case UpstreamAltMode::HEALTHMON:
908 return std::make_unique<HealthMonitorDownstreamConnection>();
913 auto &balloc = downstream->get_block_allocator();
915 StringRef authority, path;
917 if (req.forwarded_once) {
918 if (groups.size() != 1) {
919 authority = req.orig_authority;
920 path = req.orig_path;
923 if (faddr_->sni_fwd) {
925 } else if (!req.authority.empty()) {
926 authority = req.authority;
928 auto h = req.fs.header(http2::HD_HOST);
930 authority = h->value;
934 // CONNECT method does not have path. But we requires path in
935 // host-path mapping. As workaround, we assume that path is
937 if (!req.regular_connect_method()) {
941 // Cache the authority and path used for the first-time backend
942 // selection because per-pattern mruby script can change them.
943 req.orig_authority = authority;
944 req.orig_path = path;
945 req.forwarded_once = true;
948 // Fast path. If we have one group, it must be catch-all group.
949 if (groups.size() == 1) {
952 group_idx = match_downstream_addr_group(routerconf, authority, path, groups,
956 if (LOG_ENABLED(INFO)) {
957 CLOG(INFO, this) << "Downstream address group_idx: " << group_idx;
960 if (groups[group_idx]->shared_addr->redirect_if_not_tls && !conn_.tls.ssl) {
961 if (LOG_ENABLED(INFO)) {
962 CLOG(INFO, this) << "Downstream address group " << group_idx
963 << " requires frontend TLS connection.";
965 err = SHRPX_ERR_TLS_REQUIRED;
969 auto &group = groups[group_idx];
970 auto addr = get_downstream_addr(err, group.get(), downstream);
971 if (addr == nullptr) {
975 if (addr->proto == Proto::HTTP1) {
976 auto dconn = addr->dconn_pool->pop_downstream_connection();
978 dconn->set_client_handler(this);
982 if (LOG_ENABLED(INFO)) {
983 CLOG(INFO, this) << "Downstream connection pool is empty."
984 << " Create new one";
987 dconn = std::make_unique<HttpDownstreamConnection>(group, addr, conn_.loop,
989 dconn->set_client_handler(this);
993 if (LOG_ENABLED(INFO)) {
994 CLOG(INFO, this) << "Downstream connection pool is empty."
995 << " Create new one";
998 auto http2session = get_http2_session(group, addr);
999 auto dconn = std::make_unique<Http2DownstreamConnection>(http2session);
1000 dconn->set_client_handler(this);
1004 MemchunkPool *ClientHandler::get_mcpool() { return worker_->get_mcpool(); }
1006 SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; }
1008 void ClientHandler::direct_http2_upgrade() {
1009 upstream_ = std::make_unique<Http2Upstream>(this);
1010 alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1011 on_read_ = &ClientHandler::upstream_read;
1012 write_ = &ClientHandler::write_clear;
1015 int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
1016 auto upstream = std::make_unique<Http2Upstream>(this);
1018 auto output = upstream->get_response_buf();
1020 // We might have written non-final header in response_buf, in this
1021 // case, response_state is still INITIAL. If this non-final header
1022 // and upgrade header fit in output buffer, do upgrade. Otherwise,
1023 // to avoid to send this non-final header as response body in HTTP/2
1024 // upstream, fail upgrade.
1025 auto downstream = http->get_downstream();
1026 auto input = downstream->get_response_buf();
1028 if (upstream->upgrade_upstream(http) != 0) {
1031 // http pointer is now owned by upstream.
1032 upstream_.release();
1033 // TODO We might get other version id in HTTP2-settings, if we
1034 // support aliasing for h2, but we just use library default for now.
1035 alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID);
1036 on_read_ = &ClientHandler::upstream_http2_connhd_read;
1037 write_ = &ClientHandler::write_clear;
1039 input->remove(*output, input->rleft());
1041 constexpr auto res =
1042 StringRef::from_lit("HTTP/1.1 101 Switching Protocols\r\n"
1043 "Connection: Upgrade\r\n"
1044 "Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
1047 output->append(res);
1048 upstream_ = std::move(upstream);
1054 bool ClientHandler::get_http2_upgrade_allowed() const { return !conn_.tls.ssl; }
1056 StringRef ClientHandler::get_upstream_scheme() const {
1057 if (conn_.tls.ssl) {
1058 return StringRef::from_lit("https");
1060 return StringRef::from_lit("http");
1064 void ClientHandler::start_immediate_shutdown() {
1065 ev_timer_start(conn_.loop, &reneg_shutdown_timer_);
1068 void ClientHandler::write_accesslog(Downstream *downstream) {
1069 auto &req = downstream->request();
1071 auto config = get_config();
1074 auto lgconf = log_config();
1075 lgconf->update_tstamp(std::chrono::system_clock::now());
1076 req.tstamp = lgconf->tstamp;
1080 config->logging.access.format,
1087 std::chrono::high_resolution_clock::now(), // request_end_time
1094 ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; }
1096 void ClientHandler::signal_write() { conn_.wlimit.startw(); }
1098 RateLimit *ClientHandler::get_rlimit() { return &conn_.rlimit; }
1099 RateLimit *ClientHandler::get_wlimit() { return &conn_.wlimit; }
1101 ev_io *ClientHandler::get_wev() { return &conn_.wev; }
1103 Worker *ClientHandler::get_worker() const { return worker_; }
1106 ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) {
1115 if (p + 1 != last && util::is_digit(*(p + 1))) {
1121 for (; p != last && util::is_digit(*p); ++p) {
1134 int ClientHandler::on_proxy_protocol_finish() {
1135 if (conn_.tls.ssl) {
1136 conn_.tls.rbuf.append(rb_.pos(), rb_.rleft());
1140 setup_upstream_io_callback();
1142 // Run on_read to process data left in buffer since they are not
1144 if (on_read() != 0) {
1152 // PROXY-protocol v2 header signature
1153 constexpr uint8_t PROXY_PROTO_V2_SIG[] =
1154 "\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A";
1156 // PROXY-protocol v2 header length
1157 constexpr size_t PROXY_PROTO_V2_HDLEN =
1158 str_size(PROXY_PROTO_V2_SIG) + /* ver_cmd(1) + fam(1) + len(2) = */ 4;
1161 // http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt
1162 int ClientHandler::proxy_protocol_read() {
1163 if (LOG_ENABLED(INFO)) {
1164 CLOG(INFO, this) << "PROXY-protocol: Started";
1167 auto first = rb_.pos();
1169 if (rb_.rleft() >= PROXY_PROTO_V2_HDLEN &&
1170 (*(first + str_size(PROXY_PROTO_V2_SIG)) & 0xf0) == 0x20) {
1171 if (LOG_ENABLED(INFO)) {
1172 CLOG(INFO, this) << "PROXY-protocol: Detected v2 header signature";
1174 return proxy_protocol_v2_read();
1177 // NULL character really destroys functions which expects NULL
1178 // terminated string. We won't expect it in PROXY protocol line, so
1180 auto chrs = std::array<char, 2>{'\n', '\0'};
1182 constexpr size_t MAX_PROXY_LINELEN = 107;
1184 auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft());
1187 std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs));
1189 if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') {
1190 if (LOG_ENABLED(INFO)) {
1191 CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found";
1198 constexpr auto HEADER = StringRef::from_lit("PROXY ");
1200 if (static_cast<size_t>(end - rb_.pos()) < HEADER.size()) {
1201 if (LOG_ENABLED(INFO)) {
1202 CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found";
1207 if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) {
1208 if (LOG_ENABLED(INFO)) {
1209 CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID";
1214 rb_.drain(HEADER.size());
1218 if (rb_.pos()[0] == 'T') {
1219 if (end - rb_.pos() < 5) {
1220 if (LOG_ENABLED(INFO)) {
1221 CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1226 if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') {
1227 if (LOG_ENABLED(INFO)) {
1228 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1233 switch (rb_.pos()[3]) {
1241 if (LOG_ENABLED(INFO)) {
1242 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1249 if (end - rb_.pos() < 7) {
1250 if (LOG_ENABLED(INFO)) {
1251 CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
1255 if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) {
1256 if (LOG_ENABLED(INFO)) {
1257 CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
1262 rb_.drain(end + 2 - rb_.pos());
1264 return on_proxy_protocol_finish();
1268 auto token_end = std::find(rb_.pos(), end, ' ');
1269 if (token_end == end) {
1270 if (LOG_ENABLED(INFO)) {
1271 CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found";
1277 if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1278 if (LOG_ENABLED(INFO)) {
1279 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address";
1284 auto src_addr = rb_.pos();
1285 auto src_addrlen = token_end - rb_.pos();
1287 rb_.drain(token_end - rb_.pos() + 1);
1289 // destination address
1290 token_end = std::find(rb_.pos(), end, ' ');
1291 if (token_end == end) {
1292 if (LOG_ENABLED(INFO)) {
1293 CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found";
1299 if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
1300 if (LOG_ENABLED(INFO)) {
1301 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address";
1306 // Currently we don't use destination address
1308 rb_.drain(token_end - rb_.pos() + 1);
1311 auto n = parse_proxy_line_port(rb_.pos(), end);
1312 if (n <= 0 || *(rb_.pos() + n) != ' ') {
1313 if (LOG_ENABLED(INFO)) {
1314 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port";
1319 rb_.pos()[n] = '\0';
1320 auto src_port = rb_.pos();
1321 auto src_portlen = n;
1326 n = parse_proxy_line_port(rb_.pos(), end);
1327 if (n <= 0 || rb_.pos() + n != end) {
1328 if (LOG_ENABLED(INFO)) {
1329 CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port";
1334 // Currently we don't use destination port
1336 rb_.drain(end + 2 - rb_.pos());
1339 make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen});
1340 port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen});
1342 if (LOG_ENABLED(INFO)) {
1343 CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first)
1347 auto config = get_config();
1348 auto &fwdconf = config->http.forwarded;
1350 if ((fwdconf.params & FORWARDED_FOR) &&
1351 fwdconf.for_node_type == ForwardedNode::IP) {
1352 init_forwarded_for(family, ipaddr_);
1355 return on_proxy_protocol_finish();
1358 int ClientHandler::proxy_protocol_v2_read() {
1359 // Assume that first str_size(PROXY_PROTO_V2_SIG) octets match v2
1360 // protocol signature and followed by the bytes which indicates v2.
1361 assert(rb_.rleft() >= PROXY_PROTO_V2_HDLEN);
1363 auto p = rb_.pos() + str_size(PROXY_PROTO_V2_SIG);
1365 assert(((*p) & 0xf0) == 0x20);
1367 enum { LOCAL, PROXY } cmd;
1369 auto cmd_bits = (*p++) & 0xf;
1378 if (LOG_ENABLED(INFO)) {
1379 CLOG(INFO, this) << "PROXY-protocol-v2: Unknown command " << log::hex
1387 memcpy(&len, p, sizeof(len));
1392 if (LOG_ENABLED(INFO)) {
1393 CLOG(INFO, this) << "PROXY-protocol-v2: Detected family=" << log::hex << fam
1394 << ", len=" << log::dec << len;
1397 if (rb_.last() - p < len) {
1398 if (LOG_ENABLED(INFO)) {
1400 << "PROXY-protocol-v2: Prematurely truncated header block; require "
1401 << len << " bytes, " << rb_.last() - p << " bytes left";
1407 std::array<char, std::max(INET_ADDRSTRLEN, INET6_ADDRSTRLEN)> src_addr,
1415 if (LOG_ENABLED(INFO)) {
1416 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET addresses";
1426 if (LOG_ENABLED(INFO)) {
1427 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET6 addresses";
1437 if (LOG_ENABLED(INFO)) {
1438 CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_UNIX addresses";
1444 // UNSPEC and UNIX are just ignored.
1445 if (LOG_ENABLED(INFO)) {
1446 CLOG(INFO, this) << "PROXY-protocol-v2: Ignore combination of address "
1447 "family and protocol "
1450 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1451 return on_proxy_protocol_finish();
1454 if (LOG_ENABLED(INFO)) {
1455 CLOG(INFO, this) << "PROXY-protocol-v2: Unknown combination of address "
1456 "family and protocol "
1463 if (LOG_ENABLED(INFO)) {
1464 CLOG(INFO, this) << "PROXY-protocol-v2: Ignore non-PROXY command";
1466 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1467 return on_proxy_protocol_finish();
1470 if (inet_ntop(family, p, src_addr.data(), src_addr.size()) == nullptr) {
1471 if (LOG_ENABLED(INFO)) {
1472 CLOG(INFO, this) << "PROXY-protocol-v2: Unable to parse source address";
1479 if (inet_ntop(family, p, dst_addr.data(), dst_addr.size()) == nullptr) {
1480 if (LOG_ENABLED(INFO)) {
1482 << "PROXY-protocol-v2: Unable to parse destination address";
1491 memcpy(&src_port, p, sizeof(src_port));
1492 src_port = ntohs(src_port);
1494 // We don't use destination port.
1497 ipaddr_ = make_string_ref(balloc_, StringRef{src_addr.data()});
1498 port_ = util::make_string_ref_uint(balloc_, src_port);
1500 if (LOG_ENABLED(INFO)) {
1501 CLOG(INFO, this) << "PROXY-protocol-v2: Finished reading proxy addresses, "
1502 << p - rb_.pos() << " bytes read, "
1503 << PROXY_PROTO_V2_HDLEN + len - (p - rb_.pos())
1507 auto config = get_config();
1508 auto &fwdconf = config->http.forwarded;
1510 if ((fwdconf.params & FORWARDED_FOR) &&
1511 fwdconf.for_node_type == ForwardedNode::IP) {
1512 init_forwarded_for(family, ipaddr_);
1515 rb_.drain(PROXY_PROTO_V2_HDLEN + len);
1516 return on_proxy_protocol_finish();
1519 StringRef ClientHandler::get_forwarded_by() const {
1520 auto &fwdconf = get_config()->http.forwarded;
1522 if (fwdconf.by_node_type == ForwardedNode::OBFUSCATED) {
1523 return fwdconf.by_obfuscated;
1526 return faddr_->hostport;
1529 StringRef ClientHandler::get_forwarded_for() const { return forwarded_for_; }
1531 const UpstreamAddr *ClientHandler::get_upstream_addr() const { return faddr_; }
1533 Connection *ClientHandler::get_connection() { return &conn_; };
1535 void ClientHandler::set_tls_sni(const StringRef &sni) {
1536 sni_ = make_string_ref(balloc_, sni);
1539 StringRef ClientHandler::get_tls_sni() const { return sni_; }
1541 StringRef ClientHandler::get_alpn() const { return alpn_; }
1543 BlockAllocator &ClientHandler::get_block_allocator() { return balloc_; }
1545 } // namespace shrpx