Imported Upstream version 1.46.0
[platform/upstream/nghttp2.git] / src / shrpx_downstream.cc
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_downstream.h"
26
27 #include <cassert>
28
29 #include "url-parser/url_parser.h"
30
31 #include "shrpx_upstream.h"
32 #include "shrpx_client_handler.h"
33 #include "shrpx_config.h"
34 #include "shrpx_error.h"
35 #include "shrpx_downstream_connection.h"
36 #include "shrpx_downstream_queue.h"
37 #include "shrpx_worker.h"
38 #include "shrpx_http2_session.h"
39 #include "shrpx_log.h"
40 #ifdef HAVE_MRUBY
41 #  include "shrpx_mruby.h"
42 #endif // HAVE_MRUBY
43 #include "util.h"
44 #include "http2.h"
45
46 namespace shrpx {
47
48 namespace {
49 void upstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
50   auto downstream = static_cast<Downstream *>(w->data);
51   auto upstream = downstream->get_upstream();
52
53   auto which = revents == EV_READ ? "read" : "write";
54
55   if (LOG_ENABLED(INFO)) {
56     DLOG(INFO, downstream) << "upstream timeout stream_id="
57                            << downstream->get_stream_id() << " event=" << which;
58   }
59
60   downstream->disable_upstream_rtimer();
61   downstream->disable_upstream_wtimer();
62
63   upstream->on_timeout(downstream);
64 }
65 } // namespace
66
67 namespace {
68 void upstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
69   upstream_timeoutcb(loop, w, EV_READ);
70 }
71 } // namespace
72
73 namespace {
74 void upstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
75   upstream_timeoutcb(loop, w, EV_WRITE);
76 }
77 } // namespace
78
79 namespace {
80 void downstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
81   auto downstream = static_cast<Downstream *>(w->data);
82
83   auto which = revents == EV_READ ? "read" : "write";
84
85   if (LOG_ENABLED(INFO)) {
86     DLOG(INFO, downstream) << "downstream timeout stream_id="
87                            << downstream->get_downstream_stream_id()
88                            << " event=" << which;
89   }
90
91   downstream->disable_downstream_rtimer();
92   downstream->disable_downstream_wtimer();
93
94   auto dconn = downstream->get_downstream_connection();
95
96   if (dconn) {
97     dconn->on_timeout();
98   }
99 }
100 } // namespace
101
102 namespace {
103 void downstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
104   downstream_timeoutcb(loop, w, EV_READ);
105 }
106 } // namespace
107
108 namespace {
109 void downstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
110   downstream_timeoutcb(loop, w, EV_WRITE);
111 }
112 } // namespace
113
114 // upstream could be nullptr for unittests
115 Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool,
116                        int64_t stream_id)
117     : dlnext(nullptr),
118       dlprev(nullptr),
119       response_sent_body_length(0),
120       balloc_(1024, 1024),
121       req_(balloc_),
122       resp_(balloc_),
123       request_start_time_(std::chrono::high_resolution_clock::now()),
124       blocked_request_buf_(mcpool),
125       request_buf_(mcpool),
126       response_buf_(mcpool),
127       upstream_(upstream),
128       blocked_link_(nullptr),
129       addr_(nullptr),
130       num_retry_(0),
131       stream_id_(stream_id),
132       assoc_stream_id_(-1),
133       downstream_stream_id_(-1),
134       response_rst_stream_error_code_(NGHTTP2_NO_ERROR),
135       affinity_cookie_(0),
136       request_state_(DownstreamState::INITIAL),
137       response_state_(DownstreamState::INITIAL),
138       dispatch_state_(DispatchState::NONE),
139       upgraded_(false),
140       chunked_request_(false),
141       chunked_response_(false),
142       expect_final_response_(false),
143       request_pending_(false),
144       request_header_sent_(false),
145       accesslog_written_(false),
146       new_affinity_cookie_(false),
147       blocked_request_data_eof_(false),
148       expect_100_continue_(false),
149       stop_reading_(false) {
150
151   auto &timeoutconf = get_config()->http2.timeout;
152
153   ev_timer_init(&upstream_rtimer_, &upstream_rtimeoutcb, 0.,
154                 timeoutconf.stream_read);
155   ev_timer_init(&upstream_wtimer_, &upstream_wtimeoutcb, 0.,
156                 timeoutconf.stream_write);
157   ev_timer_init(&downstream_rtimer_, &downstream_rtimeoutcb, 0.,
158                 timeoutconf.stream_read);
159   ev_timer_init(&downstream_wtimer_, &downstream_wtimeoutcb, 0.,
160                 timeoutconf.stream_write);
161
162   upstream_rtimer_.data = this;
163   upstream_wtimer_.data = this;
164   downstream_rtimer_.data = this;
165   downstream_wtimer_.data = this;
166
167   rcbufs_.reserve(32);
168 #ifdef ENABLE_HTTP3
169   rcbufs3_.reserve(32);
170 #endif // ENABLE_HTTP3
171 }
172
173 Downstream::~Downstream() {
174   if (LOG_ENABLED(INFO)) {
175     DLOG(INFO, this) << "Deleting";
176   }
177
178   // check nullptr for unittest
179   if (upstream_) {
180     auto loop = upstream_->get_client_handler()->get_loop();
181
182     ev_timer_stop(loop, &upstream_rtimer_);
183     ev_timer_stop(loop, &upstream_wtimer_);
184     ev_timer_stop(loop, &downstream_rtimer_);
185     ev_timer_stop(loop, &downstream_wtimer_);
186
187 #ifdef HAVE_MRUBY
188     auto handler = upstream_->get_client_handler();
189     auto worker = handler->get_worker();
190     auto mruby_ctx = worker->get_mruby_context();
191
192     mruby_ctx->delete_downstream(this);
193 #endif // HAVE_MRUBY
194   }
195
196 #ifdef HAVE_MRUBY
197   if (dconn_) {
198     const auto &group = dconn_->get_downstream_addr_group();
199     if (group) {
200       const auto &mruby_ctx = group->shared_addr->mruby_ctx;
201       mruby_ctx->delete_downstream(this);
202     }
203   }
204 #endif // HAVE_MRUBY
205
206   // DownstreamConnection may refer to this object.  Delete it now
207   // explicitly.
208   dconn_.reset();
209
210 #ifdef ENABLE_HTTP3
211   for (auto rcbuf : rcbufs3_) {
212     nghttp3_rcbuf_decref(rcbuf);
213   }
214 #endif // ENABLE_HTTP3
215
216   for (auto rcbuf : rcbufs_) {
217     nghttp2_rcbuf_decref(rcbuf);
218   }
219
220   if (LOG_ENABLED(INFO)) {
221     DLOG(INFO, this) << "Deleted";
222   }
223 }
224
225 int Downstream::attach_downstream_connection(
226     std::unique_ptr<DownstreamConnection> dconn) {
227   if (dconn->attach_downstream(this) != 0) {
228     return -1;
229   }
230
231   dconn_ = std::move(dconn);
232
233   return 0;
234 }
235
236 void Downstream::detach_downstream_connection() {
237   if (!dconn_) {
238     return;
239   }
240
241 #ifdef HAVE_MRUBY
242   const auto &group = dconn_->get_downstream_addr_group();
243   if (group) {
244     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
245     mruby_ctx->delete_downstream(this);
246   }
247 #endif // HAVE_MRUBY
248
249   dconn_->detach_downstream(this);
250
251   auto handler = dconn_->get_client_handler();
252
253   handler->pool_downstream_connection(
254       std::unique_ptr<DownstreamConnection>(dconn_.release()));
255 }
256
257 DownstreamConnection *Downstream::get_downstream_connection() {
258   return dconn_.get();
259 }
260
261 std::unique_ptr<DownstreamConnection> Downstream::pop_downstream_connection() {
262 #ifdef HAVE_MRUBY
263   if (!dconn_) {
264     return nullptr;
265   }
266
267   const auto &group = dconn_->get_downstream_addr_group();
268   if (group) {
269     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
270     mruby_ctx->delete_downstream(this);
271   }
272 #endif // HAVE_MRUBY
273
274   return std::unique_ptr<DownstreamConnection>(dconn_.release());
275 }
276
277 void Downstream::pause_read(IOCtrlReason reason) {
278   if (dconn_) {
279     dconn_->pause_read(reason);
280   }
281 }
282
283 int Downstream::resume_read(IOCtrlReason reason, size_t consumed) {
284   if (dconn_) {
285     return dconn_->resume_read(reason, consumed);
286   }
287
288   return 0;
289 }
290
291 void Downstream::force_resume_read() {
292   if (dconn_) {
293     dconn_->force_resume_read();
294   }
295 }
296
297 namespace {
298 const HeaderRefs::value_type *
299 search_header_linear_backwards(const HeaderRefs &headers,
300                                const StringRef &name) {
301   for (auto it = headers.rbegin(); it != headers.rend(); ++it) {
302     auto &kv = *it;
303     if (kv.name == name) {
304       return &kv;
305     }
306   }
307   return nullptr;
308 }
309 } // namespace
310
311 StringRef Downstream::assemble_request_cookie() {
312   size_t len = 0;
313
314   for (auto &kv : req_.fs.headers()) {
315     if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
316       continue;
317     }
318
319     len += kv.value.size() + str_size("; ");
320   }
321
322   auto iov = make_byte_ref(balloc_, len + 1);
323   auto p = iov.base;
324
325   for (auto &kv : req_.fs.headers()) {
326     if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
327       continue;
328     }
329
330     auto end = std::end(kv.value);
331     for (auto it = std::begin(kv.value) + kv.value.size();
332          it != std::begin(kv.value); --it) {
333       auto c = *(it - 1);
334       if (c == ' ' || c == ';') {
335         continue;
336       }
337       end = it;
338       break;
339     }
340
341     p = std::copy(std::begin(kv.value), end, p);
342     p = util::copy_lit(p, "; ");
343   }
344
345   // cut trailing "; "
346   if (p - iov.base >= 2) {
347     p -= 2;
348   }
349
350   return StringRef{iov.base, p};
351 }
352
353 uint32_t Downstream::find_affinity_cookie(const StringRef &name) {
354   for (auto &kv : req_.fs.headers()) {
355     if (kv.token != http2::HD_COOKIE) {
356       continue;
357     }
358
359     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
360       if (*it == '\t' || *it == ' ' || *it == ';') {
361         ++it;
362         continue;
363       }
364
365       auto end = std::find(it, std::end(kv.value), '=');
366       if (end == std::end(kv.value)) {
367         return 0;
368       }
369
370       if (!util::streq(name, StringRef{it, end})) {
371         it = std::find(it, std::end(kv.value), ';');
372         continue;
373       }
374
375       it = std::find(end + 1, std::end(kv.value), ';');
376       auto val = StringRef{end + 1, it};
377       if (val.size() != 8) {
378         return 0;
379       }
380       uint32_t h = 0;
381       for (auto c : val) {
382         auto n = util::hex_to_uint(c);
383         if (n == 256) {
384           return 0;
385         }
386         h <<= 4;
387         h += n;
388       }
389       affinity_cookie_ = h;
390       return h;
391     }
392   }
393   return 0;
394 }
395
396 size_t Downstream::count_crumble_request_cookie() {
397   size_t n = 0;
398   for (auto &kv : req_.fs.headers()) {
399     if (kv.token != http2::HD_COOKIE) {
400       continue;
401     }
402
403     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
404       if (*it == '\t' || *it == ' ' || *it == ';') {
405         ++it;
406         continue;
407       }
408
409       it = std::find(it, std::end(kv.value), ';');
410
411       ++n;
412     }
413   }
414   return n;
415 }
416
417 void Downstream::crumble_request_cookie(std::vector<nghttp2_nv> &nva) {
418   for (auto &kv : req_.fs.headers()) {
419     if (kv.token != http2::HD_COOKIE) {
420       continue;
421     }
422
423     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
424       if (*it == '\t' || *it == ' ' || *it == ';') {
425         ++it;
426         continue;
427       }
428
429       auto first = it;
430
431       it = std::find(it, std::end(kv.value), ';');
432
433       nva.push_back({(uint8_t *)"cookie", (uint8_t *)first, str_size("cookie"),
434                      (size_t)(it - first),
435                      (uint8_t)(NGHTTP2_NV_FLAG_NO_COPY_NAME |
436                                NGHTTP2_NV_FLAG_NO_COPY_VALUE |
437                                (kv.no_index ? NGHTTP2_NV_FLAG_NO_INDEX : 0))});
438     }
439   }
440 }
441
442 namespace {
443 void add_header(size_t &sum, HeaderRefs &headers, const StringRef &name,
444                 const StringRef &value, bool no_index, int32_t token) {
445   sum += name.size() + value.size();
446   headers.emplace_back(name, value, no_index, token);
447 }
448 } // namespace
449
450 namespace {
451 StringRef alloc_header_name(BlockAllocator &balloc, const StringRef &name) {
452   auto iov = make_byte_ref(balloc, name.size() + 1);
453   auto p = iov.base;
454   p = std::copy(std::begin(name), std::end(name), p);
455   util::inp_strlower(iov.base, p);
456   *p = '\0';
457
458   return StringRef{iov.base, p};
459 }
460 } // namespace
461
462 namespace {
463 void append_last_header_key(BlockAllocator &balloc, bool &key_prev, size_t &sum,
464                             HeaderRefs &headers, const char *data, size_t len) {
465   assert(key_prev);
466   sum += len;
467   auto &item = headers.back();
468   auto name =
469       realloc_concat_string_ref(balloc, item.name, StringRef{data, len});
470
471   auto p = const_cast<uint8_t *>(name.byte());
472   util::inp_strlower(p + name.size() - len, p + name.size());
473
474   item.name = name;
475   item.token = http2::lookup_token(item.name);
476 }
477 } // namespace
478
479 namespace {
480 void append_last_header_value(BlockAllocator &balloc, bool &key_prev,
481                               size_t &sum, HeaderRefs &headers,
482                               const char *data, size_t len) {
483   key_prev = false;
484   sum += len;
485   auto &item = headers.back();
486   item.value =
487       realloc_concat_string_ref(balloc, item.value, StringRef{data, len});
488 }
489 } // namespace
490
491 int FieldStore::parse_content_length() {
492   content_length = -1;
493
494   for (auto &kv : headers_) {
495     if (kv.token != http2::HD_CONTENT_LENGTH) {
496       continue;
497     }
498
499     auto len = util::parse_uint(kv.value);
500     if (len == -1) {
501       return -1;
502     }
503     if (content_length != -1) {
504       return -1;
505     }
506     content_length = len;
507   }
508   return 0;
509 }
510
511 const HeaderRefs::value_type *FieldStore::header(int32_t token) const {
512   for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
513     auto &kv = *it;
514     if (kv.token == token) {
515       return &kv;
516     }
517   }
518   return nullptr;
519 }
520
521 HeaderRefs::value_type *FieldStore::header(int32_t token) {
522   for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
523     auto &kv = *it;
524     if (kv.token == token) {
525       return &kv;
526     }
527   }
528   return nullptr;
529 }
530
531 const HeaderRefs::value_type *FieldStore::header(const StringRef &name) const {
532   return search_header_linear_backwards(headers_, name);
533 }
534
535 void FieldStore::add_header_token(const StringRef &name, const StringRef &value,
536                                   bool no_index, int32_t token) {
537   shrpx::add_header(buffer_size_, headers_, name, value, no_index, token);
538 }
539
540 void FieldStore::alloc_add_header_name(const StringRef &name) {
541   auto name_ref = alloc_header_name(balloc_, name);
542   auto token = http2::lookup_token(name_ref);
543   add_header_token(name_ref, StringRef{}, false, token);
544   header_key_prev_ = true;
545 }
546
547 void FieldStore::append_last_header_key(const char *data, size_t len) {
548   shrpx::append_last_header_key(balloc_, header_key_prev_, buffer_size_,
549                                 headers_, data, len);
550 }
551
552 void FieldStore::append_last_header_value(const char *data, size_t len) {
553   shrpx::append_last_header_value(balloc_, header_key_prev_, buffer_size_,
554                                   headers_, data, len);
555 }
556
557 void FieldStore::clear_headers() {
558   headers_.clear();
559   header_key_prev_ = false;
560 }
561
562 void FieldStore::add_trailer_token(const StringRef &name,
563                                    const StringRef &value, bool no_index,
564                                    int32_t token) {
565   // Header size limit should be applied to all header and trailer
566   // fields combined.
567   shrpx::add_header(buffer_size_, trailers_, name, value, no_index, token);
568 }
569
570 void FieldStore::alloc_add_trailer_name(const StringRef &name) {
571   auto name_ref = alloc_header_name(balloc_, name);
572   auto token = http2::lookup_token(name_ref);
573   add_trailer_token(name_ref, StringRef{}, false, token);
574   trailer_key_prev_ = true;
575 }
576
577 void FieldStore::append_last_trailer_key(const char *data, size_t len) {
578   shrpx::append_last_header_key(balloc_, trailer_key_prev_, buffer_size_,
579                                 trailers_, data, len);
580 }
581
582 void FieldStore::append_last_trailer_value(const char *data, size_t len) {
583   shrpx::append_last_header_value(balloc_, trailer_key_prev_, buffer_size_,
584                                   trailers_, data, len);
585 }
586
587 void FieldStore::erase_content_length_and_transfer_encoding() {
588   for (auto &kv : headers_) {
589     switch (kv.token) {
590     case http2::HD_CONTENT_LENGTH:
591     case http2::HD_TRANSFER_ENCODING:
592       kv.name = StringRef{};
593       kv.token = -1;
594       break;
595     }
596   }
597 }
598
599 void Downstream::set_request_start_time(
600     std::chrono::high_resolution_clock::time_point time) {
601   request_start_time_ = std::move(time);
602 }
603
604 const std::chrono::high_resolution_clock::time_point &
605 Downstream::get_request_start_time() const {
606   return request_start_time_;
607 }
608
609 void Downstream::reset_upstream(Upstream *upstream) {
610   upstream_ = upstream;
611   if (dconn_) {
612     dconn_->on_upstream_change(upstream);
613   }
614 }
615
616 Upstream *Downstream::get_upstream() const { return upstream_; }
617
618 void Downstream::set_stream_id(int64_t stream_id) { stream_id_ = stream_id; }
619
620 int64_t Downstream::get_stream_id() const { return stream_id_; }
621
622 void Downstream::set_request_state(DownstreamState state) {
623   request_state_ = state;
624 }
625
626 DownstreamState Downstream::get_request_state() const { return request_state_; }
627
628 bool Downstream::get_chunked_request() const { return chunked_request_; }
629
630 void Downstream::set_chunked_request(bool f) { chunked_request_ = f; }
631
632 bool Downstream::request_buf_full() {
633   auto handler = upstream_->get_client_handler();
634   auto faddr = handler->get_upstream_addr();
635   auto worker = handler->get_worker();
636
637   // We don't check buffer size here for API endpoint.
638   if (faddr->alt_mode == UpstreamAltMode::API) {
639     return false;
640   }
641
642   if (dconn_) {
643     auto &downstreamconf = *worker->get_downstream_config();
644     return blocked_request_buf_.rleft() + request_buf_.rleft() >=
645            downstreamconf.request_buffer_size;
646   }
647
648   return false;
649 }
650
651 DefaultMemchunks *Downstream::get_request_buf() { return &request_buf_; }
652
653 // Call this function after this object is attached to
654 // Downstream. Otherwise, the program will crash.
655 int Downstream::push_request_headers() {
656   if (!dconn_) {
657     DLOG(INFO, this) << "dconn_ is NULL";
658     return -1;
659   }
660   return dconn_->push_request_headers();
661 }
662
663 int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) {
664   req_.recv_body_length += datalen;
665
666   if (!dconn_ && !request_header_sent_) {
667     blocked_request_buf_.append(data, datalen);
668     req_.unconsumed_body_length += datalen;
669     return 0;
670   }
671
672   // Assumes that request headers have already been pushed to output
673   // buffer using push_request_headers().
674   if (!dconn_) {
675     DLOG(INFO, this) << "dconn_ is NULL";
676     return -1;
677   }
678   if (dconn_->push_upload_data_chunk(data, datalen) != 0) {
679     return -1;
680   }
681
682   req_.unconsumed_body_length += datalen;
683
684   return 0;
685 }
686
687 int Downstream::end_upload_data() {
688   if (!dconn_ && !request_header_sent_) {
689     blocked_request_data_eof_ = true;
690     return 0;
691   }
692   if (!dconn_) {
693     DLOG(INFO, this) << "dconn_ is NULL";
694     return -1;
695   }
696   return dconn_->end_upload_data();
697 }
698
699 void Downstream::rewrite_location_response_header(
700     const StringRef &upstream_scheme) {
701   auto hd = resp_.fs.header(http2::HD_LOCATION);
702   if (!hd) {
703     return;
704   }
705
706   if (request_downstream_host_.empty() || req_.authority.empty()) {
707     return;
708   }
709
710   http_parser_url u{};
711   auto rv = http_parser_parse_url(hd->value.c_str(), hd->value.size(), 0, &u);
712   if (rv != 0) {
713     return;
714   }
715
716   auto new_uri = http2::rewrite_location_uri(balloc_, hd->value, u,
717                                              request_downstream_host_,
718                                              req_.authority, upstream_scheme);
719
720   if (new_uri.empty()) {
721     return;
722   }
723
724   hd->value = new_uri;
725 }
726
727 bool Downstream::get_chunked_response() const { return chunked_response_; }
728
729 void Downstream::set_chunked_response(bool f) { chunked_response_ = f; }
730
731 int Downstream::on_read() {
732   if (!dconn_) {
733     DLOG(INFO, this) << "dconn_ is NULL";
734     return -1;
735   }
736   return dconn_->on_read();
737 }
738
739 void Downstream::set_response_state(DownstreamState state) {
740   response_state_ = state;
741 }
742
743 DownstreamState Downstream::get_response_state() const {
744   return response_state_;
745 }
746
747 DefaultMemchunks *Downstream::get_response_buf() { return &response_buf_; }
748
749 bool Downstream::response_buf_full() {
750   if (dconn_) {
751     auto handler = upstream_->get_client_handler();
752     auto worker = handler->get_worker();
753     auto &downstreamconf = *worker->get_downstream_config();
754
755     return response_buf_.rleft() >= downstreamconf.response_buffer_size;
756   }
757
758   return false;
759 }
760
761 bool Downstream::validate_request_recv_body_length() const {
762   if (req_.fs.content_length == -1) {
763     return true;
764   }
765
766   if (req_.fs.content_length != req_.recv_body_length) {
767     if (LOG_ENABLED(INFO)) {
768       DLOG(INFO, this) << "request invalid bodylen: content-length="
769                        << req_.fs.content_length
770                        << ", received=" << req_.recv_body_length;
771     }
772     return false;
773   }
774
775   return true;
776 }
777
778 bool Downstream::validate_response_recv_body_length() const {
779   if (!expect_response_body() || resp_.fs.content_length == -1) {
780     return true;
781   }
782
783   if (resp_.fs.content_length != resp_.recv_body_length) {
784     if (LOG_ENABLED(INFO)) {
785       DLOG(INFO, this) << "response invalid bodylen: content-length="
786                        << resp_.fs.content_length
787                        << ", received=" << resp_.recv_body_length;
788     }
789     return false;
790   }
791
792   return true;
793 }
794
795 void Downstream::check_upgrade_fulfilled_http2() {
796   // This handles nonzero req_.connect_proto and h1 frontend requests
797   // WebSocket upgrade.
798   upgraded_ = (req_.method == HTTP_CONNECT ||
799                req_.connect_proto == ConnectProto::WEBSOCKET) &&
800               resp_.http_status / 100 == 2;
801 }
802
803 void Downstream::check_upgrade_fulfilled_http1() {
804   if (req_.method == HTTP_CONNECT) {
805     if (req_.connect_proto == ConnectProto::WEBSOCKET) {
806       if (resp_.http_status != 101) {
807         return;
808       }
809
810       // This is done for HTTP/2 frontend only.
811       auto accept = resp_.fs.header(http2::HD_SEC_WEBSOCKET_ACCEPT);
812       if (!accept) {
813         return;
814       }
815
816       std::array<uint8_t, base64::encode_length(20)> accept_buf;
817       auto expected =
818           http2::make_websocket_accept_token(accept_buf.data(), ws_key_);
819
820       upgraded_ = expected != "" && expected == accept->value;
821     } else {
822       upgraded_ = resp_.http_status / 100 == 2;
823     }
824
825     return;
826   }
827
828   if (resp_.http_status == 101) {
829     // TODO Do more strict checking for upgrade headers
830     upgraded_ = req_.upgrade_request;
831
832     return;
833   }
834 }
835
836 void Downstream::inspect_http2_request() {
837   if (req_.method == HTTP_CONNECT) {
838     req_.upgrade_request = true;
839   }
840 }
841
842 void Downstream::inspect_http1_request() {
843   if (req_.method == HTTP_CONNECT) {
844     req_.upgrade_request = true;
845   } else if (req_.http_minor > 0) {
846     auto upgrade = req_.fs.header(http2::HD_UPGRADE);
847     if (upgrade) {
848       const auto &val = upgrade->value;
849       // TODO Perform more strict checking for upgrade headers
850       if (util::streq_l(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID, val.c_str(),
851                         val.size())) {
852         req_.http2_upgrade_seen = true;
853       } else {
854         req_.upgrade_request = true;
855
856         // TODO Should we check Sec-WebSocket-Key, and
857         // Sec-WebSocket-Version as well?
858         if (util::strieq_l("websocket", val)) {
859           req_.connect_proto = ConnectProto::WEBSOCKET;
860         }
861       }
862     }
863   }
864   auto transfer_encoding = req_.fs.header(http2::HD_TRANSFER_ENCODING);
865   if (transfer_encoding) {
866     req_.fs.content_length = -1;
867     if (util::iends_with_l(transfer_encoding->value, "chunked")) {
868       chunked_request_ = true;
869     }
870   }
871
872   auto expect = req_.fs.header(http2::HD_EXPECT);
873   expect_100_continue_ =
874       expect &&
875       util::strieq(expect->value, StringRef::from_lit("100-continue"));
876 }
877
878 void Downstream::inspect_http1_response() {
879   auto transfer_encoding = resp_.fs.header(http2::HD_TRANSFER_ENCODING);
880   if (transfer_encoding) {
881     resp_.fs.content_length = -1;
882     if (util::iends_with_l(transfer_encoding->value, "chunked")) {
883       chunked_response_ = true;
884     }
885   }
886 }
887
888 void Downstream::reset_response() {
889   resp_.http_status = 0;
890   resp_.http_major = 1;
891   resp_.http_minor = 1;
892 }
893
894 bool Downstream::get_non_final_response() const {
895   return !upgraded_ && resp_.http_status / 100 == 1;
896 }
897
898 bool Downstream::supports_non_final_response() const {
899   return req_.http_major == 3 || req_.http_major == 2 ||
900          (req_.http_major == 1 && req_.http_minor == 1);
901 }
902
903 bool Downstream::get_upgraded() const { return upgraded_; }
904
905 bool Downstream::get_http2_upgrade_request() const {
906   return req_.http2_upgrade_seen && req_.fs.header(http2::HD_HTTP2_SETTINGS) &&
907          response_state_ == DownstreamState::INITIAL;
908 }
909
910 StringRef Downstream::get_http2_settings() const {
911   auto http2_settings = req_.fs.header(http2::HD_HTTP2_SETTINGS);
912   if (!http2_settings) {
913     return StringRef{};
914   }
915   return http2_settings->value;
916 }
917
918 void Downstream::set_downstream_stream_id(int64_t stream_id) {
919   downstream_stream_id_ = stream_id;
920 }
921
922 int64_t Downstream::get_downstream_stream_id() const {
923   return downstream_stream_id_;
924 }
925
926 uint32_t Downstream::get_response_rst_stream_error_code() const {
927   return response_rst_stream_error_code_;
928 }
929
930 void Downstream::set_response_rst_stream_error_code(uint32_t error_code) {
931   response_rst_stream_error_code_ = error_code;
932 }
933
934 void Downstream::set_expect_final_response(bool f) {
935   expect_final_response_ = f;
936 }
937
938 bool Downstream::get_expect_final_response() const {
939   return expect_final_response_;
940 }
941
942 bool Downstream::expect_response_body() const {
943   return !resp_.headers_only &&
944          http2::expect_response_body(req_.method, resp_.http_status);
945 }
946
947 bool Downstream::expect_response_trailer() const {
948   // In HTTP/2, if final response HEADERS does not bear END_STREAM it
949   // is possible trailer fields might come, regardless of request
950   // method or status code.
951   return !resp_.headers_only &&
952          (resp_.http_major == 3 || resp_.http_major == 2);
953 }
954
955 namespace {
956 void reset_timer(struct ev_loop *loop, ev_timer *w) { ev_timer_again(loop, w); }
957 } // namespace
958
959 namespace {
960 void try_reset_timer(struct ev_loop *loop, ev_timer *w) {
961   if (!ev_is_active(w)) {
962     return;
963   }
964   ev_timer_again(loop, w);
965 }
966 } // namespace
967
968 namespace {
969 void ensure_timer(struct ev_loop *loop, ev_timer *w) {
970   if (ev_is_active(w)) {
971     return;
972   }
973   ev_timer_again(loop, w);
974 }
975 } // namespace
976
977 namespace {
978 void disable_timer(struct ev_loop *loop, ev_timer *w) {
979   ev_timer_stop(loop, w);
980 }
981 } // namespace
982
983 void Downstream::reset_upstream_rtimer() {
984   if (get_config()->http2.timeout.stream_read == 0.) {
985     return;
986   }
987   auto loop = upstream_->get_client_handler()->get_loop();
988   reset_timer(loop, &upstream_rtimer_);
989 }
990
991 void Downstream::reset_upstream_wtimer() {
992   auto loop = upstream_->get_client_handler()->get_loop();
993   auto &timeoutconf = get_config()->http2.timeout;
994
995   if (timeoutconf.stream_write != 0.) {
996     reset_timer(loop, &upstream_wtimer_);
997   }
998   if (timeoutconf.stream_read != 0.) {
999     try_reset_timer(loop, &upstream_rtimer_);
1000   }
1001 }
1002
1003 void Downstream::ensure_upstream_wtimer() {
1004   if (get_config()->http2.timeout.stream_write == 0.) {
1005     return;
1006   }
1007   auto loop = upstream_->get_client_handler()->get_loop();
1008   ensure_timer(loop, &upstream_wtimer_);
1009 }
1010
1011 void Downstream::disable_upstream_rtimer() {
1012   if (get_config()->http2.timeout.stream_read == 0.) {
1013     return;
1014   }
1015   auto loop = upstream_->get_client_handler()->get_loop();
1016   disable_timer(loop, &upstream_rtimer_);
1017 }
1018
1019 void Downstream::disable_upstream_wtimer() {
1020   if (get_config()->http2.timeout.stream_write == 0.) {
1021     return;
1022   }
1023   auto loop = upstream_->get_client_handler()->get_loop();
1024   disable_timer(loop, &upstream_wtimer_);
1025 }
1026
1027 void Downstream::reset_downstream_rtimer() {
1028   if (get_config()->http2.timeout.stream_read == 0.) {
1029     return;
1030   }
1031   auto loop = upstream_->get_client_handler()->get_loop();
1032   reset_timer(loop, &downstream_rtimer_);
1033 }
1034
1035 void Downstream::reset_downstream_wtimer() {
1036   auto loop = upstream_->get_client_handler()->get_loop();
1037   auto &timeoutconf = get_config()->http2.timeout;
1038
1039   if (timeoutconf.stream_write != 0.) {
1040     reset_timer(loop, &downstream_wtimer_);
1041   }
1042   if (timeoutconf.stream_read != 0.) {
1043     try_reset_timer(loop, &downstream_rtimer_);
1044   }
1045 }
1046
1047 void Downstream::ensure_downstream_wtimer() {
1048   if (get_config()->http2.timeout.stream_write == 0.) {
1049     return;
1050   }
1051   auto loop = upstream_->get_client_handler()->get_loop();
1052   ensure_timer(loop, &downstream_wtimer_);
1053 }
1054
1055 void Downstream::disable_downstream_rtimer() {
1056   if (get_config()->http2.timeout.stream_read == 0.) {
1057     return;
1058   }
1059   auto loop = upstream_->get_client_handler()->get_loop();
1060   disable_timer(loop, &downstream_rtimer_);
1061 }
1062
1063 void Downstream::disable_downstream_wtimer() {
1064   if (get_config()->http2.timeout.stream_write == 0.) {
1065     return;
1066   }
1067   auto loop = upstream_->get_client_handler()->get_loop();
1068   disable_timer(loop, &downstream_wtimer_);
1069 }
1070
1071 bool Downstream::accesslog_ready() const {
1072   return !accesslog_written_ && resp_.http_status > 0;
1073 }
1074
1075 void Downstream::add_retry() { ++num_retry_; }
1076
1077 bool Downstream::no_more_retry() const { return num_retry_ > 50; }
1078
1079 void Downstream::set_request_downstream_host(const StringRef &host) {
1080   request_downstream_host_ = host;
1081 }
1082
1083 void Downstream::set_request_pending(bool f) { request_pending_ = f; }
1084
1085 bool Downstream::get_request_pending() const { return request_pending_; }
1086
1087 void Downstream::set_request_header_sent(bool f) { request_header_sent_ = f; }
1088
1089 bool Downstream::get_request_header_sent() const {
1090   return request_header_sent_;
1091 }
1092
1093 bool Downstream::request_submission_ready() const {
1094   return (request_state_ == DownstreamState::HEADER_COMPLETE ||
1095           request_state_ == DownstreamState::MSG_COMPLETE) &&
1096          (request_pending_ || !request_header_sent_) &&
1097          response_state_ == DownstreamState::INITIAL;
1098 }
1099
1100 DispatchState Downstream::get_dispatch_state() const { return dispatch_state_; }
1101
1102 void Downstream::set_dispatch_state(DispatchState s) { dispatch_state_ = s; }
1103
1104 void Downstream::attach_blocked_link(BlockedLink *l) {
1105   assert(!blocked_link_);
1106
1107   l->downstream = this;
1108   blocked_link_ = l;
1109 }
1110
1111 BlockedLink *Downstream::detach_blocked_link() {
1112   auto link = blocked_link_;
1113   blocked_link_ = nullptr;
1114   return link;
1115 }
1116
1117 bool Downstream::can_detach_downstream_connection() const {
1118   // We should check request and response buffer.  If request buffer
1119   // is not empty, then we might leave downstream connection in weird
1120   // state, especially for HTTP/1.1
1121   return dconn_ && response_state_ == DownstreamState::MSG_COMPLETE &&
1122          request_state_ == DownstreamState::MSG_COMPLETE && !upgraded_ &&
1123          !resp_.connection_close && request_buf_.rleft() == 0;
1124 }
1125
1126 DefaultMemchunks Downstream::pop_response_buf() {
1127   return std::move(response_buf_);
1128 }
1129
1130 void Downstream::set_assoc_stream_id(int64_t stream_id) {
1131   assoc_stream_id_ = stream_id;
1132 }
1133
1134 int64_t Downstream::get_assoc_stream_id() const { return assoc_stream_id_; }
1135
1136 BlockAllocator &Downstream::get_block_allocator() { return balloc_; }
1137
1138 void Downstream::add_rcbuf(nghttp2_rcbuf *rcbuf) {
1139   nghttp2_rcbuf_incref(rcbuf);
1140   rcbufs_.push_back(rcbuf);
1141 }
1142
1143 #ifdef ENABLE_HTTP3
1144 void Downstream::add_rcbuf(nghttp3_rcbuf *rcbuf) {
1145   nghttp3_rcbuf_incref(rcbuf);
1146   rcbufs3_.push_back(rcbuf);
1147 }
1148 #endif // ENABLE_HTTP3
1149
1150 void Downstream::set_downstream_addr_group(
1151     const std::shared_ptr<DownstreamAddrGroup> &group) {
1152   group_ = group;
1153 }
1154
1155 void Downstream::set_addr(const DownstreamAddr *addr) { addr_ = addr; }
1156
1157 const DownstreamAddr *Downstream::get_addr() const { return addr_; }
1158
1159 void Downstream::set_accesslog_written(bool f) { accesslog_written_ = f; }
1160
1161 void Downstream::renew_affinity_cookie(uint32_t h) {
1162   affinity_cookie_ = h;
1163   new_affinity_cookie_ = true;
1164 }
1165
1166 uint32_t Downstream::get_affinity_cookie_to_send() const {
1167   if (new_affinity_cookie_) {
1168     return affinity_cookie_;
1169   }
1170   return 0;
1171 }
1172
1173 DefaultMemchunks *Downstream::get_blocked_request_buf() {
1174   return &blocked_request_buf_;
1175 }
1176
1177 bool Downstream::get_blocked_request_data_eof() const {
1178   return blocked_request_data_eof_;
1179 }
1180
1181 void Downstream::set_blocked_request_data_eof(bool f) {
1182   blocked_request_data_eof_ = f;
1183 }
1184
1185 void Downstream::set_ws_key(const StringRef &key) { ws_key_ = key; }
1186
1187 bool Downstream::get_expect_100_continue() const {
1188   return expect_100_continue_;
1189 }
1190
1191 bool Downstream::get_stop_reading() const { return stop_reading_; }
1192
1193 void Downstream::set_stop_reading(bool f) { stop_reading_ = f; }
1194
1195 } // namespace shrpx