2 * nghttp2 - HTTP/2 C Library
4 * Copyright (c) 2012 Tatsuhiro Tsujikawa
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 #include "shrpx_downstream.h"
29 #include "url-parser/url_parser.h"
31 #include "shrpx_upstream.h"
32 #include "shrpx_client_handler.h"
33 #include "shrpx_config.h"
34 #include "shrpx_error.h"
35 #include "shrpx_downstream_connection.h"
36 #include "shrpx_downstream_queue.h"
37 #include "shrpx_worker.h"
38 #include "shrpx_http2_session.h"
39 #include "shrpx_log.h"
41 # include "shrpx_mruby.h"
49 void upstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
50 auto downstream = static_cast<Downstream *>(w->data);
51 auto upstream = downstream->get_upstream();
53 auto which = revents == EV_READ ? "read" : "write";
55 if (LOG_ENABLED(INFO)) {
56 DLOG(INFO, downstream) << "upstream timeout stream_id="
57 << downstream->get_stream_id() << " event=" << which;
60 downstream->disable_upstream_rtimer();
61 downstream->disable_upstream_wtimer();
63 upstream->on_timeout(downstream);
68 void upstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
69 upstream_timeoutcb(loop, w, EV_READ);
74 void upstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
75 upstream_timeoutcb(loop, w, EV_WRITE);
80 void downstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
81 auto downstream = static_cast<Downstream *>(w->data);
83 auto which = revents == EV_READ ? "read" : "write";
85 if (LOG_ENABLED(INFO)) {
86 DLOG(INFO, downstream) << "downstream timeout stream_id="
87 << downstream->get_downstream_stream_id()
88 << " event=" << which;
91 downstream->disable_downstream_rtimer();
92 downstream->disable_downstream_wtimer();
94 auto dconn = downstream->get_downstream_connection();
103 void downstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
104 downstream_timeoutcb(loop, w, EV_READ);
109 void downstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
110 downstream_timeoutcb(loop, w, EV_WRITE);
114 // upstream could be nullptr for unittests
115 Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool,
119 response_sent_body_length(0),
123 request_start_time_(std::chrono::high_resolution_clock::now()),
124 blocked_request_buf_(mcpool),
125 request_buf_(mcpool),
126 response_buf_(mcpool),
128 blocked_link_(nullptr),
131 stream_id_(stream_id),
132 assoc_stream_id_(-1),
133 downstream_stream_id_(-1),
134 response_rst_stream_error_code_(NGHTTP2_NO_ERROR),
136 request_state_(DownstreamState::INITIAL),
137 response_state_(DownstreamState::INITIAL),
138 dispatch_state_(DispatchState::NONE),
140 chunked_request_(false),
141 chunked_response_(false),
142 expect_final_response_(false),
143 request_pending_(false),
144 request_header_sent_(false),
145 accesslog_written_(false),
146 new_affinity_cookie_(false),
147 blocked_request_data_eof_(false),
148 expect_100_continue_(false) {
150 auto &timeoutconf = get_config()->http2.timeout;
152 ev_timer_init(&upstream_rtimer_, &upstream_rtimeoutcb, 0.,
153 timeoutconf.stream_read);
154 ev_timer_init(&upstream_wtimer_, &upstream_wtimeoutcb, 0.,
155 timeoutconf.stream_write);
156 ev_timer_init(&downstream_rtimer_, &downstream_rtimeoutcb, 0.,
157 timeoutconf.stream_read);
158 ev_timer_init(&downstream_wtimer_, &downstream_wtimeoutcb, 0.,
159 timeoutconf.stream_write);
161 upstream_rtimer_.data = this;
162 upstream_wtimer_.data = this;
163 downstream_rtimer_.data = this;
164 downstream_wtimer_.data = this;
169 Downstream::~Downstream() {
170 if (LOG_ENABLED(INFO)) {
171 DLOG(INFO, this) << "Deleting";
174 // check nullptr for unittest
176 auto loop = upstream_->get_client_handler()->get_loop();
178 ev_timer_stop(loop, &upstream_rtimer_);
179 ev_timer_stop(loop, &upstream_wtimer_);
180 ev_timer_stop(loop, &downstream_rtimer_);
181 ev_timer_stop(loop, &downstream_wtimer_);
184 auto handler = upstream_->get_client_handler();
185 auto worker = handler->get_worker();
186 auto mruby_ctx = worker->get_mruby_context();
188 mruby_ctx->delete_downstream(this);
194 const auto &group = dconn_->get_downstream_addr_group();
196 const auto &mruby_ctx = group->shared_addr->mruby_ctx;
197 mruby_ctx->delete_downstream(this);
202 // DownstreamConnection may refer to this object. Delete it now
206 for (auto rcbuf : rcbufs_) {
207 nghttp2_rcbuf_decref(rcbuf);
210 if (LOG_ENABLED(INFO)) {
211 DLOG(INFO, this) << "Deleted";
215 int Downstream::attach_downstream_connection(
216 std::unique_ptr<DownstreamConnection> dconn) {
217 if (dconn->attach_downstream(this) != 0) {
221 dconn_ = std::move(dconn);
226 void Downstream::detach_downstream_connection() {
232 const auto &group = dconn_->get_downstream_addr_group();
234 const auto &mruby_ctx = group->shared_addr->mruby_ctx;
235 mruby_ctx->delete_downstream(this);
239 dconn_->detach_downstream(this);
241 auto handler = dconn_->get_client_handler();
243 handler->pool_downstream_connection(
244 std::unique_ptr<DownstreamConnection>(dconn_.release()));
247 DownstreamConnection *Downstream::get_downstream_connection() {
251 std::unique_ptr<DownstreamConnection> Downstream::pop_downstream_connection() {
257 const auto &group = dconn_->get_downstream_addr_group();
259 const auto &mruby_ctx = group->shared_addr->mruby_ctx;
260 mruby_ctx->delete_downstream(this);
264 return std::unique_ptr<DownstreamConnection>(dconn_.release());
267 void Downstream::pause_read(IOCtrlReason reason) {
269 dconn_->pause_read(reason);
273 int Downstream::resume_read(IOCtrlReason reason, size_t consumed) {
275 return dconn_->resume_read(reason, consumed);
281 void Downstream::force_resume_read() {
283 dconn_->force_resume_read();
288 const HeaderRefs::value_type *
289 search_header_linear_backwards(const HeaderRefs &headers,
290 const StringRef &name) {
291 for (auto it = headers.rbegin(); it != headers.rend(); ++it) {
293 if (kv.name == name) {
301 StringRef Downstream::assemble_request_cookie() {
304 for (auto &kv : req_.fs.headers()) {
305 if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
309 len += kv.value.size() + str_size("; ");
312 auto iov = make_byte_ref(balloc_, len + 1);
315 for (auto &kv : req_.fs.headers()) {
316 if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
320 auto end = std::end(kv.value);
321 for (auto it = std::begin(kv.value) + kv.value.size();
322 it != std::begin(kv.value); --it) {
324 if (c == ' ' || c == ';') {
331 p = std::copy(std::begin(kv.value), end, p);
332 p = util::copy_lit(p, "; ");
336 if (p - iov.base >= 2) {
340 return StringRef{iov.base, p};
343 uint32_t Downstream::find_affinity_cookie(const StringRef &name) {
344 for (auto &kv : req_.fs.headers()) {
345 if (kv.token != http2::HD_COOKIE) {
349 for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
350 if (*it == '\t' || *it == ' ' || *it == ';') {
355 auto end = std::find(it, std::end(kv.value), '=');
356 if (end == std::end(kv.value)) {
360 if (!util::streq(name, StringRef{it, end})) {
361 it = std::find(it, std::end(kv.value), ';');
365 it = std::find(end + 1, std::end(kv.value), ';');
366 auto val = StringRef{end + 1, it};
367 if (val.size() != 8) {
372 auto n = util::hex_to_uint(c);
379 affinity_cookie_ = h;
386 size_t Downstream::count_crumble_request_cookie() {
388 for (auto &kv : req_.fs.headers()) {
389 if (kv.token != http2::HD_COOKIE) {
393 for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
394 if (*it == '\t' || *it == ' ' || *it == ';') {
399 it = std::find(it, std::end(kv.value), ';');
407 void Downstream::crumble_request_cookie(std::vector<nghttp2_nv> &nva) {
408 for (auto &kv : req_.fs.headers()) {
409 if (kv.token != http2::HD_COOKIE) {
413 for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
414 if (*it == '\t' || *it == ' ' || *it == ';') {
421 it = std::find(it, std::end(kv.value), ';');
423 nva.push_back({(uint8_t *)"cookie", (uint8_t *)first, str_size("cookie"),
424 (size_t)(it - first),
425 (uint8_t)(NGHTTP2_NV_FLAG_NO_COPY_NAME |
426 NGHTTP2_NV_FLAG_NO_COPY_VALUE |
427 (kv.no_index ? NGHTTP2_NV_FLAG_NO_INDEX : 0))});
433 void add_header(size_t &sum, HeaderRefs &headers, const StringRef &name,
434 const StringRef &value, bool no_index, int32_t token) {
435 sum += name.size() + value.size();
436 headers.emplace_back(name, value, no_index, token);
441 StringRef alloc_header_name(BlockAllocator &balloc, const StringRef &name) {
442 auto iov = make_byte_ref(balloc, name.size() + 1);
444 p = std::copy(std::begin(name), std::end(name), p);
445 util::inp_strlower(iov.base, p);
448 return StringRef{iov.base, p};
453 void append_last_header_key(BlockAllocator &balloc, bool &key_prev, size_t &sum,
454 HeaderRefs &headers, const char *data, size_t len) {
457 auto &item = headers.back();
459 realloc_concat_string_ref(balloc, item.name, StringRef{data, len});
461 auto p = const_cast<uint8_t *>(name.byte());
462 util::inp_strlower(p + name.size() - len, p + name.size());
465 item.token = http2::lookup_token(item.name);
470 void append_last_header_value(BlockAllocator &balloc, bool &key_prev,
471 size_t &sum, HeaderRefs &headers,
472 const char *data, size_t len) {
475 auto &item = headers.back();
477 realloc_concat_string_ref(balloc, item.value, StringRef{data, len});
481 int FieldStore::parse_content_length() {
484 for (auto &kv : headers_) {
485 if (kv.token != http2::HD_CONTENT_LENGTH) {
489 auto len = util::parse_uint(kv.value);
493 if (content_length != -1) {
496 content_length = len;
501 const HeaderRefs::value_type *FieldStore::header(int32_t token) const {
502 for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
504 if (kv.token == token) {
511 HeaderRefs::value_type *FieldStore::header(int32_t token) {
512 for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
514 if (kv.token == token) {
521 const HeaderRefs::value_type *FieldStore::header(const StringRef &name) const {
522 return search_header_linear_backwards(headers_, name);
525 void FieldStore::add_header_token(const StringRef &name, const StringRef &value,
526 bool no_index, int32_t token) {
527 shrpx::add_header(buffer_size_, headers_, name, value, no_index, token);
530 void FieldStore::alloc_add_header_name(const StringRef &name) {
531 auto name_ref = alloc_header_name(balloc_, name);
532 auto token = http2::lookup_token(name_ref);
533 add_header_token(name_ref, StringRef{}, false, token);
534 header_key_prev_ = true;
537 void FieldStore::append_last_header_key(const char *data, size_t len) {
538 shrpx::append_last_header_key(balloc_, header_key_prev_, buffer_size_,
539 headers_, data, len);
542 void FieldStore::append_last_header_value(const char *data, size_t len) {
543 shrpx::append_last_header_value(balloc_, header_key_prev_, buffer_size_,
544 headers_, data, len);
547 void FieldStore::clear_headers() {
549 header_key_prev_ = false;
552 void FieldStore::add_trailer_token(const StringRef &name,
553 const StringRef &value, bool no_index,
555 // Header size limit should be applied to all header and trailer
557 shrpx::add_header(buffer_size_, trailers_, name, value, no_index, token);
560 void FieldStore::alloc_add_trailer_name(const StringRef &name) {
561 auto name_ref = alloc_header_name(balloc_, name);
562 auto token = http2::lookup_token(name_ref);
563 add_trailer_token(name_ref, StringRef{}, false, token);
564 trailer_key_prev_ = true;
567 void FieldStore::append_last_trailer_key(const char *data, size_t len) {
568 shrpx::append_last_header_key(balloc_, trailer_key_prev_, buffer_size_,
569 trailers_, data, len);
572 void FieldStore::append_last_trailer_value(const char *data, size_t len) {
573 shrpx::append_last_header_value(balloc_, trailer_key_prev_, buffer_size_,
574 trailers_, data, len);
577 void FieldStore::erase_content_length_and_transfer_encoding() {
578 for (auto &kv : headers_) {
580 case http2::HD_CONTENT_LENGTH:
581 case http2::HD_TRANSFER_ENCODING:
582 kv.name = StringRef{};
589 void Downstream::set_request_start_time(
590 std::chrono::high_resolution_clock::time_point time) {
591 request_start_time_ = std::move(time);
594 const std::chrono::high_resolution_clock::time_point &
595 Downstream::get_request_start_time() const {
596 return request_start_time_;
599 void Downstream::reset_upstream(Upstream *upstream) {
600 upstream_ = upstream;
602 dconn_->on_upstream_change(upstream);
606 Upstream *Downstream::get_upstream() const { return upstream_; }
608 void Downstream::set_stream_id(int32_t stream_id) { stream_id_ = stream_id; }
610 int32_t Downstream::get_stream_id() const { return stream_id_; }
612 void Downstream::set_request_state(DownstreamState state) {
613 request_state_ = state;
616 DownstreamState Downstream::get_request_state() const { return request_state_; }
618 bool Downstream::get_chunked_request() const { return chunked_request_; }
620 void Downstream::set_chunked_request(bool f) { chunked_request_ = f; }
622 bool Downstream::request_buf_full() {
623 auto handler = upstream_->get_client_handler();
624 auto faddr = handler->get_upstream_addr();
625 auto worker = handler->get_worker();
627 // We don't check buffer size here for API endpoint.
628 if (faddr->alt_mode == UpstreamAltMode::API) {
633 auto &downstreamconf = *worker->get_downstream_config();
634 return blocked_request_buf_.rleft() + request_buf_.rleft() >=
635 downstreamconf.request_buffer_size;
641 DefaultMemchunks *Downstream::get_request_buf() { return &request_buf_; }
643 // Call this function after this object is attached to
644 // Downstream. Otherwise, the program will crash.
645 int Downstream::push_request_headers() {
647 DLOG(INFO, this) << "dconn_ is NULL";
650 return dconn_->push_request_headers();
653 int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) {
654 req_.recv_body_length += datalen;
656 if (!dconn_ && !request_header_sent_) {
657 blocked_request_buf_.append(data, datalen);
658 req_.unconsumed_body_length += datalen;
662 // Assumes that request headers have already been pushed to output
663 // buffer using push_request_headers().
665 DLOG(INFO, this) << "dconn_ is NULL";
668 if (dconn_->push_upload_data_chunk(data, datalen) != 0) {
672 req_.unconsumed_body_length += datalen;
677 int Downstream::end_upload_data() {
678 if (!dconn_ && !request_header_sent_) {
679 blocked_request_data_eof_ = true;
683 DLOG(INFO, this) << "dconn_ is NULL";
686 return dconn_->end_upload_data();
689 void Downstream::rewrite_location_response_header(
690 const StringRef &upstream_scheme) {
691 auto hd = resp_.fs.header(http2::HD_LOCATION);
696 if (request_downstream_host_.empty() || req_.authority.empty()) {
701 auto rv = http_parser_parse_url(hd->value.c_str(), hd->value.size(), 0, &u);
706 auto new_uri = http2::rewrite_location_uri(balloc_, hd->value, u,
707 request_downstream_host_,
708 req_.authority, upstream_scheme);
710 if (new_uri.empty()) {
717 bool Downstream::get_chunked_response() const { return chunked_response_; }
719 void Downstream::set_chunked_response(bool f) { chunked_response_ = f; }
721 int Downstream::on_read() {
723 DLOG(INFO, this) << "dconn_ is NULL";
726 return dconn_->on_read();
729 void Downstream::set_response_state(DownstreamState state) {
730 response_state_ = state;
733 DownstreamState Downstream::get_response_state() const {
734 return response_state_;
737 DefaultMemchunks *Downstream::get_response_buf() { return &response_buf_; }
739 bool Downstream::response_buf_full() {
741 auto handler = upstream_->get_client_handler();
742 auto worker = handler->get_worker();
743 auto &downstreamconf = *worker->get_downstream_config();
745 return response_buf_.rleft() >= downstreamconf.response_buffer_size;
751 bool Downstream::validate_request_recv_body_length() const {
752 if (req_.fs.content_length == -1) {
756 if (req_.fs.content_length != req_.recv_body_length) {
757 if (LOG_ENABLED(INFO)) {
758 DLOG(INFO, this) << "request invalid bodylen: content-length="
759 << req_.fs.content_length
760 << ", received=" << req_.recv_body_length;
768 bool Downstream::validate_response_recv_body_length() const {
769 if (!expect_response_body() || resp_.fs.content_length == -1) {
773 if (resp_.fs.content_length != resp_.recv_body_length) {
774 if (LOG_ENABLED(INFO)) {
775 DLOG(INFO, this) << "response invalid bodylen: content-length="
776 << resp_.fs.content_length
777 << ", received=" << resp_.recv_body_length;
785 void Downstream::check_upgrade_fulfilled_http2() {
786 // This handles nonzero req_.connect_proto and h1 frontend requests
787 // WebSocket upgrade.
788 upgraded_ = (req_.method == HTTP_CONNECT ||
789 req_.connect_proto == ConnectProto::WEBSOCKET) &&
790 resp_.http_status / 100 == 2;
793 void Downstream::check_upgrade_fulfilled_http1() {
794 if (req_.method == HTTP_CONNECT) {
795 if (req_.connect_proto == ConnectProto::WEBSOCKET) {
796 if (resp_.http_status != 101) {
800 // This is done for HTTP/2 frontend only.
801 auto accept = resp_.fs.header(http2::HD_SEC_WEBSOCKET_ACCEPT);
806 std::array<uint8_t, base64::encode_length(20)> accept_buf;
808 http2::make_websocket_accept_token(accept_buf.data(), ws_key_);
810 upgraded_ = expected != "" && expected == accept->value;
812 upgraded_ = resp_.http_status / 100 == 2;
818 if (resp_.http_status == 101) {
819 // TODO Do more strict checking for upgrade headers
820 upgraded_ = req_.upgrade_request;
826 void Downstream::inspect_http2_request() {
827 if (req_.method == HTTP_CONNECT) {
828 req_.upgrade_request = true;
832 void Downstream::inspect_http1_request() {
833 if (req_.method == HTTP_CONNECT) {
834 req_.upgrade_request = true;
835 } else if (req_.http_minor > 0) {
836 auto upgrade = req_.fs.header(http2::HD_UPGRADE);
838 const auto &val = upgrade->value;
839 // TODO Perform more strict checking for upgrade headers
840 if (util::streq_l(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID, val.c_str(),
842 req_.http2_upgrade_seen = true;
844 req_.upgrade_request = true;
846 // TODO Should we check Sec-WebSocket-Key, and
847 // Sec-WebSocket-Version as well?
848 if (util::strieq_l("websocket", val)) {
849 req_.connect_proto = ConnectProto::WEBSOCKET;
854 auto transfer_encoding = req_.fs.header(http2::HD_TRANSFER_ENCODING);
855 if (transfer_encoding) {
856 req_.fs.content_length = -1;
857 if (util::iends_with_l(transfer_encoding->value, "chunked")) {
858 chunked_request_ = true;
862 auto expect = req_.fs.header(http2::HD_EXPECT);
863 expect_100_continue_ =
865 util::strieq(expect->value, StringRef::from_lit("100-continue"));
868 void Downstream::inspect_http1_response() {
869 auto transfer_encoding = resp_.fs.header(http2::HD_TRANSFER_ENCODING);
870 if (transfer_encoding) {
871 resp_.fs.content_length = -1;
872 if (util::iends_with_l(transfer_encoding->value, "chunked")) {
873 chunked_response_ = true;
878 void Downstream::reset_response() {
879 resp_.http_status = 0;
880 resp_.http_major = 1;
881 resp_.http_minor = 1;
884 bool Downstream::get_non_final_response() const {
885 return !upgraded_ && resp_.http_status / 100 == 1;
888 bool Downstream::supports_non_final_response() const {
889 return req_.http_major == 2 || (req_.http_major == 1 && req_.http_minor == 1);
892 bool Downstream::get_upgraded() const { return upgraded_; }
894 bool Downstream::get_http2_upgrade_request() const {
895 return req_.http2_upgrade_seen && req_.fs.header(http2::HD_HTTP2_SETTINGS) &&
896 response_state_ == DownstreamState::INITIAL;
899 StringRef Downstream::get_http2_settings() const {
900 auto http2_settings = req_.fs.header(http2::HD_HTTP2_SETTINGS);
901 if (!http2_settings) {
904 return http2_settings->value;
907 void Downstream::set_downstream_stream_id(int32_t stream_id) {
908 downstream_stream_id_ = stream_id;
911 int32_t Downstream::get_downstream_stream_id() const {
912 return downstream_stream_id_;
915 uint32_t Downstream::get_response_rst_stream_error_code() const {
916 return response_rst_stream_error_code_;
919 void Downstream::set_response_rst_stream_error_code(uint32_t error_code) {
920 response_rst_stream_error_code_ = error_code;
923 void Downstream::set_expect_final_response(bool f) {
924 expect_final_response_ = f;
927 bool Downstream::get_expect_final_response() const {
928 return expect_final_response_;
931 bool Downstream::expect_response_body() const {
932 return !resp_.headers_only &&
933 http2::expect_response_body(req_.method, resp_.http_status);
936 bool Downstream::expect_response_trailer() const {
937 // In HTTP/2, if final response HEADERS does not bear END_STREAM it
938 // is possible trailer fields might come, regardless of request
939 // method or status code.
940 return !resp_.headers_only && resp_.http_major == 2;
944 void reset_timer(struct ev_loop *loop, ev_timer *w) { ev_timer_again(loop, w); }
948 void try_reset_timer(struct ev_loop *loop, ev_timer *w) {
949 if (!ev_is_active(w)) {
952 ev_timer_again(loop, w);
957 void ensure_timer(struct ev_loop *loop, ev_timer *w) {
958 if (ev_is_active(w)) {
961 ev_timer_again(loop, w);
966 void disable_timer(struct ev_loop *loop, ev_timer *w) {
967 ev_timer_stop(loop, w);
971 void Downstream::reset_upstream_rtimer() {
972 if (get_config()->http2.timeout.stream_read == 0.) {
975 auto loop = upstream_->get_client_handler()->get_loop();
976 reset_timer(loop, &upstream_rtimer_);
979 void Downstream::reset_upstream_wtimer() {
980 auto loop = upstream_->get_client_handler()->get_loop();
981 auto &timeoutconf = get_config()->http2.timeout;
983 if (timeoutconf.stream_write != 0.) {
984 reset_timer(loop, &upstream_wtimer_);
986 if (timeoutconf.stream_read != 0.) {
987 try_reset_timer(loop, &upstream_rtimer_);
991 void Downstream::ensure_upstream_wtimer() {
992 if (get_config()->http2.timeout.stream_write == 0.) {
995 auto loop = upstream_->get_client_handler()->get_loop();
996 ensure_timer(loop, &upstream_wtimer_);
999 void Downstream::disable_upstream_rtimer() {
1000 if (get_config()->http2.timeout.stream_read == 0.) {
1003 auto loop = upstream_->get_client_handler()->get_loop();
1004 disable_timer(loop, &upstream_rtimer_);
1007 void Downstream::disable_upstream_wtimer() {
1008 if (get_config()->http2.timeout.stream_write == 0.) {
1011 auto loop = upstream_->get_client_handler()->get_loop();
1012 disable_timer(loop, &upstream_wtimer_);
1015 void Downstream::reset_downstream_rtimer() {
1016 if (get_config()->http2.timeout.stream_read == 0.) {
1019 auto loop = upstream_->get_client_handler()->get_loop();
1020 reset_timer(loop, &downstream_rtimer_);
1023 void Downstream::reset_downstream_wtimer() {
1024 auto loop = upstream_->get_client_handler()->get_loop();
1025 auto &timeoutconf = get_config()->http2.timeout;
1027 if (timeoutconf.stream_write != 0.) {
1028 reset_timer(loop, &downstream_wtimer_);
1030 if (timeoutconf.stream_read != 0.) {
1031 try_reset_timer(loop, &downstream_rtimer_);
1035 void Downstream::ensure_downstream_wtimer() {
1036 if (get_config()->http2.timeout.stream_write == 0.) {
1039 auto loop = upstream_->get_client_handler()->get_loop();
1040 ensure_timer(loop, &downstream_wtimer_);
1043 void Downstream::disable_downstream_rtimer() {
1044 if (get_config()->http2.timeout.stream_read == 0.) {
1047 auto loop = upstream_->get_client_handler()->get_loop();
1048 disable_timer(loop, &downstream_rtimer_);
1051 void Downstream::disable_downstream_wtimer() {
1052 if (get_config()->http2.timeout.stream_write == 0.) {
1055 auto loop = upstream_->get_client_handler()->get_loop();
1056 disable_timer(loop, &downstream_wtimer_);
1059 bool Downstream::accesslog_ready() const {
1060 return !accesslog_written_ && resp_.http_status > 0;
1063 void Downstream::add_retry() { ++num_retry_; }
1065 bool Downstream::no_more_retry() const { return num_retry_ > 50; }
1067 void Downstream::set_request_downstream_host(const StringRef &host) {
1068 request_downstream_host_ = host;
1071 void Downstream::set_request_pending(bool f) { request_pending_ = f; }
1073 bool Downstream::get_request_pending() const { return request_pending_; }
1075 void Downstream::set_request_header_sent(bool f) { request_header_sent_ = f; }
1077 bool Downstream::get_request_header_sent() const {
1078 return request_header_sent_;
1081 bool Downstream::request_submission_ready() const {
1082 return (request_state_ == DownstreamState::HEADER_COMPLETE ||
1083 request_state_ == DownstreamState::MSG_COMPLETE) &&
1084 (request_pending_ || !request_header_sent_) &&
1085 response_state_ == DownstreamState::INITIAL;
1088 DispatchState Downstream::get_dispatch_state() const { return dispatch_state_; }
1090 void Downstream::set_dispatch_state(DispatchState s) { dispatch_state_ = s; }
1092 void Downstream::attach_blocked_link(BlockedLink *l) {
1093 assert(!blocked_link_);
1095 l->downstream = this;
1099 BlockedLink *Downstream::detach_blocked_link() {
1100 auto link = blocked_link_;
1101 blocked_link_ = nullptr;
1105 bool Downstream::can_detach_downstream_connection() const {
1106 // We should check request and response buffer. If request buffer
1107 // is not empty, then we might leave downstream connection in weird
1108 // state, especially for HTTP/1.1
1109 return dconn_ && response_state_ == DownstreamState::MSG_COMPLETE &&
1110 request_state_ == DownstreamState::MSG_COMPLETE && !upgraded_ &&
1111 !resp_.connection_close && request_buf_.rleft() == 0;
1114 DefaultMemchunks Downstream::pop_response_buf() {
1115 return std::move(response_buf_);
1118 void Downstream::set_assoc_stream_id(int32_t stream_id) {
1119 assoc_stream_id_ = stream_id;
1122 int32_t Downstream::get_assoc_stream_id() const { return assoc_stream_id_; }
1124 BlockAllocator &Downstream::get_block_allocator() { return balloc_; }
1126 void Downstream::add_rcbuf(nghttp2_rcbuf *rcbuf) {
1127 nghttp2_rcbuf_incref(rcbuf);
1128 rcbufs_.push_back(rcbuf);
1131 void Downstream::set_downstream_addr_group(
1132 const std::shared_ptr<DownstreamAddrGroup> &group) {
1136 void Downstream::set_addr(const DownstreamAddr *addr) { addr_ = addr; }
1138 const DownstreamAddr *Downstream::get_addr() const { return addr_; }
1140 void Downstream::set_accesslog_written(bool f) { accesslog_written_ = f; }
1142 void Downstream::renew_affinity_cookie(uint32_t h) {
1143 affinity_cookie_ = h;
1144 new_affinity_cookie_ = true;
1147 uint32_t Downstream::get_affinity_cookie_to_send() const {
1148 if (new_affinity_cookie_) {
1149 return affinity_cookie_;
1154 DefaultMemchunks *Downstream::get_blocked_request_buf() {
1155 return &blocked_request_buf_;
1158 bool Downstream::get_blocked_request_data_eof() const {
1159 return blocked_request_data_eof_;
1162 void Downstream::set_blocked_request_data_eof(bool f) {
1163 blocked_request_data_eof_ = f;
1166 void Downstream::set_ws_key(const StringRef &key) { ws_key_ = key; }
1168 bool Downstream::get_expect_100_continue() const {
1169 return expect_100_continue_;
1172 } // namespace shrpx