Apply PIE to nghttpx
[platform/upstream/nghttp2.git] / src / shrpx_http_downstream_connection.cc
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http_downstream_connection.h"
26
27 #include "shrpx_client_handler.h"
28 #include "shrpx_upstream.h"
29 #include "shrpx_downstream.h"
30 #include "shrpx_config.h"
31 #include "shrpx_error.h"
32 #include "shrpx_http.h"
33 #include "shrpx_log_config.h"
34 #include "shrpx_connect_blocker.h"
35 #include "shrpx_downstream_connection_pool.h"
36 #include "shrpx_worker.h"
37 #include "shrpx_http2_session.h"
38 #include "shrpx_tls.h"
39 #include "shrpx_log.h"
40 #include "http2.h"
41 #include "util.h"
42 #include "ssl_compat.h"
43
44 using namespace nghttp2;
45
46 namespace shrpx {
47
48 namespace {
49 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
50   auto conn = static_cast<Connection *>(w->data);
51   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
52
53   if (w == &conn->rt && !conn->expired_rt()) {
54     return;
55   }
56
57   if (LOG_ENABLED(INFO)) {
58     DCLOG(INFO, dconn) << "Time out";
59   }
60
61   auto downstream = dconn->get_downstream();
62   auto upstream = downstream->get_upstream();
63   auto handler = upstream->get_client_handler();
64   auto &resp = downstream->response();
65
66   // Do this so that dconn is not pooled
67   resp.connection_close = true;
68
69   if (upstream->downstream_error(dconn, Downstream::EVENT_TIMEOUT) != 0) {
70     delete handler;
71   }
72 }
73 } // namespace
74
75 namespace {
76 void retry_downstream_connection(Downstream *downstream,
77                                  unsigned int status_code) {
78   auto upstream = downstream->get_upstream();
79   auto handler = upstream->get_client_handler();
80
81   assert(!downstream->get_request_header_sent());
82
83   downstream->add_retry();
84
85   if (downstream->no_more_retry()) {
86     delete handler;
87     return;
88   }
89
90   downstream->pop_downstream_connection();
91   auto buf = downstream->get_request_buf();
92   buf->reset();
93
94   int rv;
95
96   for (;;) {
97     auto ndconn = handler->get_downstream_connection(rv, downstream);
98     if (!ndconn) {
99       break;
100     }
101     if (downstream->attach_downstream_connection(std::move(ndconn)) != 0) {
102       continue;
103     }
104     if (downstream->push_request_headers() == 0) {
105       return;
106     }
107   }
108
109   downstream->set_request_state(DownstreamState::CONNECT_FAIL);
110
111   if (rv == SHRPX_ERR_TLS_REQUIRED) {
112     rv = upstream->on_downstream_abort_request_with_https_redirect(downstream);
113   } else {
114     rv = upstream->on_downstream_abort_request(downstream, status_code);
115   }
116
117   if (rv != 0) {
118     delete handler;
119   }
120 }
121 } // namespace
122
123 namespace {
124 void connect_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
125   auto conn = static_cast<Connection *>(w->data);
126   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
127   auto addr = dconn->get_addr();
128   auto raddr = dconn->get_raddr();
129
130   DCLOG(WARN, dconn) << "Connect time out; addr="
131                      << util::to_numeric_addr(raddr);
132
133   downstream_failure(addr, raddr);
134
135   auto downstream = dconn->get_downstream();
136
137   retry_downstream_connection(downstream, 504);
138 }
139 } // namespace
140
141 namespace {
142 void backend_retry(Downstream *downstream) {
143   retry_downstream_connection(downstream, 502);
144 }
145 } // namespace
146
147 namespace {
148 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
149   int rv;
150   auto conn = static_cast<Connection *>(w->data);
151   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
152   auto downstream = dconn->get_downstream();
153   auto upstream = downstream->get_upstream();
154   auto handler = upstream->get_client_handler();
155
156   rv = upstream->downstream_read(dconn);
157   if (rv != 0) {
158     if (rv == SHRPX_ERR_RETRY) {
159       backend_retry(downstream);
160       return;
161     }
162
163     delete handler;
164   }
165 }
166 } // namespace
167
168 namespace {
169 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
170   int rv;
171   auto conn = static_cast<Connection *>(w->data);
172   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
173   auto downstream = dconn->get_downstream();
174   auto upstream = downstream->get_upstream();
175   auto handler = upstream->get_client_handler();
176
177   rv = upstream->downstream_write(dconn);
178   if (rv == SHRPX_ERR_RETRY) {
179     backend_retry(downstream);
180     return;
181   }
182
183   if (rv != 0) {
184     delete handler;
185   }
186 }
187 } // namespace
188
189 namespace {
190 void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
191   auto conn = static_cast<Connection *>(w->data);
192   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
193   auto downstream = dconn->get_downstream();
194   if (dconn->connected() != 0) {
195     backend_retry(downstream);
196     return;
197   }
198   writecb(loop, w, revents);
199 }
200 } // namespace
201
202 HttpDownstreamConnection::HttpDownstreamConnection(
203     const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr,
204     struct ev_loop *loop, Worker *worker)
205     : conn_(loop, -1, nullptr, worker->get_mcpool(),
206             group->shared_addr->timeout.write, group->shared_addr->timeout.read,
207             {}, {}, connectcb, readcb, connect_timeoutcb, this,
208             get_config()->tls.dyn_rec.warmup_threshold,
209             get_config()->tls.dyn_rec.idle_timeout, Proto::HTTP1),
210       on_read_(&HttpDownstreamConnection::noop),
211       on_write_(&HttpDownstreamConnection::noop),
212       signal_write_(&HttpDownstreamConnection::noop),
213       worker_(worker),
214       ssl_ctx_(worker->get_cl_ssl_ctx()),
215       group_(group),
216       addr_(addr),
217       raddr_(nullptr),
218       ioctrl_(&conn_.rlimit),
219       response_htp_{0},
220       first_write_done_(false),
221       reusable_(true),
222       request_header_written_(false) {}
223
224 HttpDownstreamConnection::~HttpDownstreamConnection() {
225   if (LOG_ENABLED(INFO)) {
226     DCLOG(INFO, this) << "Deleted";
227   }
228
229   if (dns_query_) {
230     auto dns_tracker = worker_->get_dns_tracker();
231     dns_tracker->cancel(dns_query_.get());
232   }
233 }
234
235 int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
236   int rv;
237
238   if (LOG_ENABLED(INFO)) {
239     DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream;
240   }
241
242   downstream_ = downstream;
243
244   rv = initiate_connection();
245   if (rv != 0) {
246     downstream_ = nullptr;
247     return rv;
248   }
249
250   return 0;
251 }
252
253 namespace {
254 int htp_msg_begincb(llhttp_t *htp);
255 int htp_hdr_keycb(llhttp_t *htp, const char *data, size_t len);
256 int htp_hdr_valcb(llhttp_t *htp, const char *data, size_t len);
257 int htp_hdrs_completecb(llhttp_t *htp);
258 int htp_bodycb(llhttp_t *htp, const char *data, size_t len);
259 int htp_msg_completecb(llhttp_t *htp);
260 } // namespace
261
262 namespace {
263 constexpr llhttp_settings_t htp_hooks = {
264     htp_msg_begincb,     // llhttp_cb      on_message_begin;
265     nullptr,             // llhttp_data_cb on_url;
266     nullptr,             // llhttp_data_cb on_status;
267     htp_hdr_keycb,       // llhttp_data_cb on_header_field;
268     htp_hdr_valcb,       // llhttp_data_cb on_header_value;
269     htp_hdrs_completecb, // llhttp_cb      on_headers_complete;
270     htp_bodycb,          // llhttp_data_cb on_body;
271     htp_msg_completecb,  // llhttp_cb      on_message_complete;
272     nullptr,             // llhttp_cb      on_chunk_header
273     nullptr,             // llhttp_cb      on_chunk_complete
274 };
275 } // namespace
276
277 int HttpDownstreamConnection::initiate_connection() {
278   int rv;
279
280   auto worker_blocker = worker_->get_connect_blocker();
281   if (worker_blocker->blocked()) {
282     if (LOG_ENABLED(INFO)) {
283       DCLOG(INFO, this)
284           << "Worker wide backend connection was blocked temporarily";
285     }
286     return SHRPX_ERR_NETWORK;
287   }
288
289   auto &downstreamconf = *worker_->get_downstream_config();
290
291   if (conn_.fd == -1) {
292     auto check_dns_result = dns_query_.get() != nullptr;
293
294     if (check_dns_result) {
295       assert(addr_->dns);
296     }
297
298     auto &connect_blocker = addr_->connect_blocker;
299
300     if (connect_blocker->blocked()) {
301       if (LOG_ENABLED(INFO)) {
302         DCLOG(INFO, this) << "Backend server " << addr_->host << ":"
303                           << addr_->port << " was not available temporarily";
304       }
305
306       return SHRPX_ERR_NETWORK;
307     }
308
309     Address *raddr;
310
311     if (addr_->dns) {
312       if (!check_dns_result) {
313         auto dns_query = std::make_unique<DNSQuery>(
314             addr_->host,
315             [this](DNSResolverStatus status, const Address *result) {
316               int rv;
317
318               if (status == DNSResolverStatus::OK) {
319                 *this->resolved_addr_ = *result;
320               }
321
322               rv = this->initiate_connection();
323               if (rv != 0) {
324                 // This callback destroys |this|.
325                 auto downstream = this->downstream_;
326                 backend_retry(downstream);
327               }
328             });
329
330         auto dns_tracker = worker_->get_dns_tracker();
331
332         if (!resolved_addr_) {
333           resolved_addr_ = std::make_unique<Address>();
334         }
335         switch (dns_tracker->resolve(resolved_addr_.get(), dns_query.get())) {
336         case DNSResolverStatus::ERROR:
337           downstream_failure(addr_, nullptr);
338           return SHRPX_ERR_NETWORK;
339         case DNSResolverStatus::RUNNING:
340           dns_query_ = std::move(dns_query);
341           return 0;
342         case DNSResolverStatus::OK:
343           break;
344         default:
345           assert(0);
346         }
347       } else {
348         switch (dns_query_->status) {
349         case DNSResolverStatus::ERROR:
350           dns_query_.reset();
351           downstream_failure(addr_, nullptr);
352           return SHRPX_ERR_NETWORK;
353         case DNSResolverStatus::OK:
354           dns_query_.reset();
355           break;
356         default:
357           assert(0);
358         }
359       }
360
361       raddr = resolved_addr_.get();
362       util::set_port(*resolved_addr_, addr_->port);
363     } else {
364       raddr = &addr_->addr;
365     }
366
367     conn_.fd = util::create_nonblock_socket(raddr->su.storage.ss_family);
368
369     if (conn_.fd == -1) {
370       auto error = errno;
371       DCLOG(WARN, this) << "socket() failed; addr="
372                         << util::to_numeric_addr(raddr) << ", errno=" << error;
373
374       worker_blocker->on_failure();
375
376       return SHRPX_ERR_NETWORK;
377     }
378
379     worker_blocker->on_success();
380
381     rv = connect(conn_.fd, &raddr->su.sa, raddr->len);
382     if (rv != 0 && errno != EINPROGRESS) {
383       auto error = errno;
384       DCLOG(WARN, this) << "connect() failed; addr="
385                         << util::to_numeric_addr(raddr) << ", errno=" << error;
386
387       downstream_failure(addr_, raddr);
388
389       return SHRPX_ERR_NETWORK;
390     }
391
392     if (LOG_ENABLED(INFO)) {
393       DCLOG(INFO, this) << "Connecting to downstream server";
394     }
395
396     raddr_ = raddr;
397
398     if (addr_->tls) {
399       assert(ssl_ctx_);
400
401       auto ssl = tls::create_ssl(ssl_ctx_);
402       if (!ssl) {
403         return -1;
404       }
405
406       tls::setup_downstream_http1_alpn(ssl);
407
408       conn_.set_ssl(ssl);
409       conn_.tls.client_session_cache = &addr_->tls_session_cache;
410
411       auto sni_name =
412           addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni};
413       if (!util::numeric_host(sni_name.c_str())) {
414         SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name.c_str());
415       }
416
417       auto session = tls::reuse_tls_session(addr_->tls_session_cache);
418       if (session) {
419         SSL_set_session(conn_.tls.ssl, session);
420         SSL_SESSION_free(session);
421       }
422
423       conn_.prepare_client_handshake();
424     }
425
426     ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
427     ev_io_set(&conn_.rev, conn_.fd, EV_READ);
428
429     conn_.wlimit.startw();
430
431     conn_.wt.repeat = downstreamconf.timeout.connect;
432     ev_timer_again(conn_.loop, &conn_.wt);
433   } else {
434     // we may set read timer cb to idle_timeoutcb.  Reset again.
435     ev_set_cb(&conn_.rt, timeoutcb);
436     if (conn_.read_timeout < group_->shared_addr->timeout.read) {
437       conn_.read_timeout = group_->shared_addr->timeout.read;
438       conn_.last_read = ev_now(conn_.loop);
439     } else {
440       conn_.again_rt(group_->shared_addr->timeout.read);
441     }
442
443     ev_set_cb(&conn_.rev, readcb);
444
445     on_write_ = &HttpDownstreamConnection::write_first;
446     first_write_done_ = false;
447     request_header_written_ = false;
448   }
449
450   llhttp_init(&response_htp_, HTTP_RESPONSE, &htp_hooks);
451   response_htp_.data = downstream_;
452
453   return 0;
454 }
455
456 int HttpDownstreamConnection::push_request_headers() {
457   if (request_header_written_) {
458     signal_write();
459     return 0;
460   }
461
462   const auto &downstream_hostport = addr_->hostport;
463   const auto &req = downstream_->request();
464
465   auto &balloc = downstream_->get_block_allocator();
466
467   auto connect_method = req.regular_connect_method();
468
469   auto config = get_config();
470   auto &httpconf = config->http;
471
472   request_header_written_ = true;
473
474   // For HTTP/1.0 request, there is no authority in request.  In that
475   // case, we use backend server's host nonetheless.
476   auto authority = StringRef(downstream_hostport);
477   auto no_host_rewrite =
478       httpconf.no_host_rewrite || config->http2_proxy || connect_method;
479
480   if (no_host_rewrite && !req.authority.empty()) {
481     authority = req.authority;
482   }
483
484   downstream_->set_request_downstream_host(authority);
485
486   auto buf = downstream_->get_request_buf();
487
488   // Assume that method and request path do not contain \r\n.
489   auto meth = http2::to_method_string(
490       req.connect_proto == ConnectProto::WEBSOCKET ? HTTP_GET : req.method);
491   buf->append(meth);
492   buf->append(' ');
493
494   if (connect_method) {
495     buf->append(authority);
496   } else if (config->http2_proxy) {
497     // Construct absolute-form request target because we are going to
498     // send a request to a HTTP/1 proxy.
499     assert(!req.scheme.empty());
500     buf->append(req.scheme);
501     buf->append("://");
502     buf->append(authority);
503     buf->append(req.path);
504   } else if (req.method == HTTP_OPTIONS && req.path.empty()) {
505     // Server-wide OPTIONS
506     buf->append("*");
507   } else {
508     buf->append(req.path);
509   }
510   buf->append(" HTTP/1.1\r\nHost: ");
511   buf->append(authority);
512   buf->append("\r\n");
513
514   auto &fwdconf = httpconf.forwarded;
515   auto &xffconf = httpconf.xff;
516   auto &xfpconf = httpconf.xfp;
517   auto &earlydataconf = httpconf.early_data;
518
519   uint32_t build_flags =
520       (fwdconf.strip_incoming ? http2::HDOP_STRIP_FORWARDED : 0) |
521       (xffconf.strip_incoming ? http2::HDOP_STRIP_X_FORWARDED_FOR : 0) |
522       (xfpconf.strip_incoming ? http2::HDOP_STRIP_X_FORWARDED_PROTO : 0) |
523       (earlydataconf.strip_incoming ? http2::HDOP_STRIP_EARLY_DATA : 0) |
524       (req.http_major == 2 ? http2::HDOP_STRIP_SEC_WEBSOCKET_KEY : 0);
525
526   http2::build_http1_headers_from_headers(buf, req.fs.headers(), build_flags);
527
528   auto cookie = downstream_->assemble_request_cookie();
529   if (!cookie.empty()) {
530     buf->append("Cookie: ");
531     buf->append(cookie);
532     buf->append("\r\n");
533   }
534
535   // set transfer-encoding only when content-length is unknown and
536   // request body is expected.
537   if (req.method != HTTP_CONNECT && req.http2_expect_body &&
538       req.fs.content_length == -1) {
539     downstream_->set_chunked_request(true);
540     buf->append("Transfer-Encoding: chunked\r\n");
541   }
542
543   if (req.connect_proto == ConnectProto::WEBSOCKET) {
544     if (req.http_major == 2) {
545       std::array<uint8_t, 16> nonce;
546       util::random_bytes(std::begin(nonce), std::end(nonce),
547                          worker_->get_randgen());
548       auto iov = make_byte_ref(balloc, base64::encode_length(nonce.size()) + 1);
549       auto p = base64::encode(std::begin(nonce), std::end(nonce), iov.base);
550       *p = '\0';
551       auto key = StringRef{iov.base, p};
552       downstream_->set_ws_key(key);
553
554       buf->append("Sec-Websocket-Key: ");
555       buf->append(key);
556       buf->append("\r\n");
557     }
558
559     buf->append("Upgrade: websocket\r\nConnection: Upgrade\r\n");
560   } else if (!connect_method && req.upgrade_request) {
561     auto connection = req.fs.header(http2::HD_CONNECTION);
562     if (connection) {
563       buf->append("Connection: ");
564       buf->append((*connection).value);
565       buf->append("\r\n");
566     }
567
568     auto upgrade = req.fs.header(http2::HD_UPGRADE);
569     if (upgrade) {
570       buf->append("Upgrade: ");
571       buf->append((*upgrade).value);
572       buf->append("\r\n");
573     }
574   } else if (req.connection_close) {
575     buf->append("Connection: close\r\n");
576   }
577
578   auto upstream = downstream_->get_upstream();
579   auto handler = upstream->get_client_handler();
580
581 #if OPENSSL_1_1_1_API
582   auto conn = handler->get_connection();
583
584   if (conn->tls.ssl && !SSL_is_init_finished(conn->tls.ssl)) {
585     buf->append("Early-Data: 1\r\n");
586   }
587 #endif // OPENSSL_1_1_1_API
588
589   auto fwd =
590       fwdconf.strip_incoming ? nullptr : req.fs.header(http2::HD_FORWARDED);
591
592   if (fwdconf.params) {
593     auto params = fwdconf.params;
594
595     if (config->http2_proxy || connect_method) {
596       params &= ~FORWARDED_PROTO;
597     }
598
599     auto value = http::create_forwarded(
600         balloc, params, handler->get_forwarded_by(),
601         handler->get_forwarded_for(), req.authority, req.scheme);
602
603     if (fwd || !value.empty()) {
604       buf->append("Forwarded: ");
605       if (fwd) {
606         buf->append(fwd->value);
607
608         if (!value.empty()) {
609           buf->append(", ");
610         }
611       }
612       buf->append(value);
613       buf->append("\r\n");
614     }
615   } else if (fwd) {
616     buf->append("Forwarded: ");
617     buf->append(fwd->value);
618     buf->append("\r\n");
619   }
620
621   auto xff = xffconf.strip_incoming ? nullptr
622                                     : req.fs.header(http2::HD_X_FORWARDED_FOR);
623
624   if (xffconf.add) {
625     buf->append("X-Forwarded-For: ");
626     if (xff) {
627       buf->append((*xff).value);
628       buf->append(", ");
629     }
630     buf->append(client_handler_->get_ipaddr());
631     buf->append("\r\n");
632   } else if (xff) {
633     buf->append("X-Forwarded-For: ");
634     buf->append((*xff).value);
635     buf->append("\r\n");
636   }
637   if (!config->http2_proxy && !connect_method) {
638     auto xfp = xfpconf.strip_incoming
639                    ? nullptr
640                    : req.fs.header(http2::HD_X_FORWARDED_PROTO);
641
642     if (xfpconf.add) {
643       buf->append("X-Forwarded-Proto: ");
644       if (xfp) {
645         buf->append((*xfp).value);
646         buf->append(", ");
647       }
648       assert(!req.scheme.empty());
649       buf->append(req.scheme);
650       buf->append("\r\n");
651     } else if (xfp) {
652       buf->append("X-Forwarded-Proto: ");
653       buf->append((*xfp).value);
654       buf->append("\r\n");
655     }
656   }
657   auto via = req.fs.header(http2::HD_VIA);
658   if (httpconf.no_via) {
659     if (via) {
660       buf->append("Via: ");
661       buf->append((*via).value);
662       buf->append("\r\n");
663     }
664   } else {
665     buf->append("Via: ");
666     if (via) {
667       buf->append((*via).value);
668       buf->append(", ");
669     }
670     std::array<char, 16> viabuf;
671     auto end = http::create_via_header_value(viabuf.data(), req.http_major,
672                                              req.http_minor);
673     buf->append(viabuf.data(), end - viabuf.data());
674     buf->append("\r\n");
675   }
676
677   for (auto &p : httpconf.add_request_headers) {
678     buf->append(p.name);
679     buf->append(": ");
680     buf->append(p.value);
681     buf->append("\r\n");
682   }
683
684   buf->append("\r\n");
685
686   if (LOG_ENABLED(INFO)) {
687     std::string nhdrs;
688     for (auto chunk = buf->head; chunk; chunk = chunk->next) {
689       nhdrs.append(chunk->pos, chunk->last);
690     }
691     if (log_config()->errorlog_tty) {
692       nhdrs = http::colorizeHeaders(nhdrs.c_str());
693     }
694     DCLOG(INFO, this) << "HTTP request headers. stream_id="
695                       << downstream_->get_stream_id() << "\n"
696                       << nhdrs;
697   }
698
699   // Don't call signal_write() if we anticipate request body.  We call
700   // signal_write() when we received request body chunk, and it
701   // enables us to send headers and data in one writev system call.
702   if (req.method == HTTP_CONNECT ||
703       downstream_->get_blocked_request_buf()->rleft() ||
704       (!req.http2_expect_body && req.fs.content_length == 0) ||
705       downstream_->get_expect_100_continue()) {
706     signal_write();
707   }
708
709   return 0;
710 }
711
712 int HttpDownstreamConnection::process_blocked_request_buf() {
713   auto src = downstream_->get_blocked_request_buf();
714
715   if (src->rleft()) {
716     auto dest = downstream_->get_request_buf();
717     auto chunked = downstream_->get_chunked_request();
718     if (chunked) {
719       auto chunk_size_hex = util::utox(src->rleft());
720       dest->append(chunk_size_hex);
721       dest->append("\r\n");
722     }
723
724     src->copy(*dest);
725
726     if (chunked) {
727       dest->append("\r\n");
728     }
729   }
730
731   if (downstream_->get_blocked_request_data_eof() &&
732       downstream_->get_chunked_request()) {
733     end_upload_data_chunk();
734   }
735
736   return 0;
737 }
738
739 int HttpDownstreamConnection::push_upload_data_chunk(const uint8_t *data,
740                                                      size_t datalen) {
741   if (!downstream_->get_request_header_sent()) {
742     auto output = downstream_->get_blocked_request_buf();
743     auto &req = downstream_->request();
744     output->append(data, datalen);
745     req.unconsumed_body_length += datalen;
746     if (request_header_written_) {
747       signal_write();
748     }
749     return 0;
750   }
751
752   auto chunked = downstream_->get_chunked_request();
753   auto output = downstream_->get_request_buf();
754
755   if (chunked) {
756     auto chunk_size_hex = util::utox(datalen);
757     output->append(chunk_size_hex);
758     output->append("\r\n");
759   }
760
761   output->append(data, datalen);
762
763   if (chunked) {
764     output->append("\r\n");
765   }
766
767   signal_write();
768
769   return 0;
770 }
771
772 int HttpDownstreamConnection::end_upload_data() {
773   if (!downstream_->get_request_header_sent()) {
774     downstream_->set_blocked_request_data_eof(true);
775     if (request_header_written_) {
776       signal_write();
777     }
778     return 0;
779   }
780
781   signal_write();
782
783   if (!downstream_->get_chunked_request()) {
784     return 0;
785   }
786
787   end_upload_data_chunk();
788
789   return 0;
790 }
791
792 void HttpDownstreamConnection::end_upload_data_chunk() {
793   const auto &req = downstream_->request();
794
795   auto output = downstream_->get_request_buf();
796   const auto &trailers = req.fs.trailers();
797   if (trailers.empty()) {
798     output->append("0\r\n\r\n");
799   } else {
800     output->append("0\r\n");
801     http2::build_http1_headers_from_headers(output, trailers,
802                                             http2::HDOP_STRIP_ALL);
803     output->append("\r\n");
804   }
805 }
806
807 namespace {
808 void remove_from_pool(HttpDownstreamConnection *dconn) {
809   auto addr = dconn->get_addr();
810   auto &dconn_pool = addr->dconn_pool;
811   dconn_pool->remove_downstream_connection(dconn);
812 }
813 } // namespace
814
815 namespace {
816 void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) {
817   auto conn = static_cast<Connection *>(w->data);
818   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
819   if (LOG_ENABLED(INFO)) {
820     DCLOG(INFO, dconn) << "Idle connection EOF";
821   }
822
823   remove_from_pool(dconn);
824   // dconn was deleted
825 }
826 } // namespace
827
828 namespace {
829 void idle_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
830   auto conn = static_cast<Connection *>(w->data);
831   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
832
833   if (w == &conn->rt && !conn->expired_rt()) {
834     return;
835   }
836
837   if (LOG_ENABLED(INFO)) {
838     DCLOG(INFO, dconn) << "Idle connection timeout";
839   }
840
841   remove_from_pool(dconn);
842   // dconn was deleted
843 }
844 } // namespace
845
846 void HttpDownstreamConnection::detach_downstream(Downstream *downstream) {
847   if (LOG_ENABLED(INFO)) {
848     DCLOG(INFO, this) << "Detaching from DOWNSTREAM:" << downstream;
849   }
850   downstream_ = nullptr;
851
852   ev_set_cb(&conn_.rev, idle_readcb);
853   ioctrl_.force_resume_read();
854
855   auto &downstreamconf = *worker_->get_downstream_config();
856
857   ev_set_cb(&conn_.rt, idle_timeoutcb);
858   if (conn_.read_timeout < downstreamconf.timeout.idle_read) {
859     conn_.read_timeout = downstreamconf.timeout.idle_read;
860     conn_.last_read = ev_now(conn_.loop);
861   } else {
862     conn_.again_rt(downstreamconf.timeout.idle_read);
863   }
864
865   conn_.wlimit.stopw();
866   ev_timer_stop(conn_.loop, &conn_.wt);
867 }
868
869 void HttpDownstreamConnection::pause_read(IOCtrlReason reason) {
870   ioctrl_.pause_read(reason);
871 }
872
873 int HttpDownstreamConnection::resume_read(IOCtrlReason reason,
874                                           size_t consumed) {
875   auto &downstreamconf = *worker_->get_downstream_config();
876
877   if (downstream_->get_response_buf()->rleft() <=
878       downstreamconf.request_buffer_size / 2) {
879     ioctrl_.resume_read(reason);
880   }
881
882   return 0;
883 }
884
885 void HttpDownstreamConnection::force_resume_read() {
886   ioctrl_.force_resume_read();
887 }
888
889 namespace {
890 int htp_msg_begincb(llhttp_t *htp) {
891   auto downstream = static_cast<Downstream *>(htp->data);
892
893   if (downstream->get_response_state() != DownstreamState::INITIAL) {
894     llhttp_set_error_reason(htp, "HTTP message started when it shouldn't");
895     return HPE_USER;
896   }
897
898   return 0;
899 }
900 } // namespace
901
902 namespace {
903 int htp_hdrs_completecb(llhttp_t *htp) {
904   auto downstream = static_cast<Downstream *>(htp->data);
905   auto upstream = downstream->get_upstream();
906   auto handler = upstream->get_client_handler();
907   const auto &req = downstream->request();
908   auto &resp = downstream->response();
909   int rv;
910
911   auto config = get_config();
912   auto &loggingconf = config->logging;
913
914   resp.http_status = htp->status_code;
915   resp.http_major = htp->http_major;
916   resp.http_minor = htp->http_minor;
917
918   if (resp.http_major > 1 || req.http_minor > 1) {
919     resp.http_major = 1;
920     resp.http_minor = 1;
921     return -1;
922   }
923
924   auto dconn = downstream->get_downstream_connection();
925
926   downstream->set_downstream_addr_group(dconn->get_downstream_addr_group());
927   downstream->set_addr(dconn->get_addr());
928
929   // Server MUST NOT send Transfer-Encoding with a status code 1xx or
930   // 204.  Also server MUST NOT send Transfer-Encoding with a status
931   // code 2xx to a CONNECT request.  Same holds true with
932   // Content-Length.
933   if (resp.http_status == 204) {
934     if (resp.fs.header(http2::HD_TRANSFER_ENCODING)) {
935       return -1;
936     }
937     // Some server send content-length: 0 for 204.  Until they get
938     // fixed, we accept, but ignore it.
939
940     // Calling parse_content_length() detects duplicated
941     // content-length header fields.
942     if (resp.fs.parse_content_length() != 0) {
943       return -1;
944     }
945     if (resp.fs.content_length == 0) {
946       resp.fs.erase_content_length_and_transfer_encoding();
947     } else if (resp.fs.content_length != -1) {
948       return -1;
949     }
950   } else if (resp.http_status / 100 == 1 ||
951              (resp.http_status / 100 == 2 && req.method == HTTP_CONNECT)) {
952     // Server MUST NOT send Content-Length and Transfer-Encoding in
953     // these responses.
954     resp.fs.erase_content_length_and_transfer_encoding();
955   } else if (resp.fs.parse_content_length() != 0) {
956     downstream->set_response_state(DownstreamState::MSG_BAD_HEADER);
957     return -1;
958   }
959
960   // Check upgrade before processing non-final response, since if
961   // upgrade succeeded, 101 response is treated as final in nghttpx.
962   downstream->check_upgrade_fulfilled_http1();
963
964   if (downstream->get_non_final_response()) {
965     // Reset content-length because we reuse same Downstream for the
966     // next response.
967     resp.fs.content_length = -1;
968     // For non-final response code, we just call
969     // on_downstream_header_complete() without changing response
970     // state.
971     rv = upstream->on_downstream_header_complete(downstream);
972
973     if (rv != 0) {
974       return -1;
975     }
976
977     // Ignore response body for non-final response.
978     return 1;
979   }
980
981   resp.connection_close = !llhttp_should_keep_alive(htp);
982   downstream->set_response_state(DownstreamState::HEADER_COMPLETE);
983   downstream->inspect_http1_response();
984   if (downstream->get_upgraded()) {
985     // content-length must be ignored for upgraded connection.
986     resp.fs.content_length = -1;
987     resp.connection_close = true;
988     // transfer-encoding not applied to upgraded connection
989     downstream->set_chunked_response(false);
990   } else if (http2::legacy_http1(req.http_major, req.http_minor)) {
991     if (resp.fs.content_length == -1) {
992       resp.connection_close = true;
993     }
994     downstream->set_chunked_response(false);
995   } else if (!downstream->expect_response_body()) {
996     downstream->set_chunked_response(false);
997   }
998
999   if (loggingconf.access.write_early && downstream->accesslog_ready()) {
1000     handler->write_accesslog(downstream);
1001     downstream->set_accesslog_written(true);
1002   }
1003
1004   if (upstream->on_downstream_header_complete(downstream) != 0) {
1005     return -1;
1006   }
1007
1008   if (downstream->get_upgraded()) {
1009     // Upgrade complete, read until EOF in both ends
1010     if (upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0) != 0) {
1011       return -1;
1012     }
1013     downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
1014     if (LOG_ENABLED(INFO)) {
1015       LOG(INFO) << "HTTP upgrade success. stream_id="
1016                 << downstream->get_stream_id();
1017     }
1018   }
1019
1020   // Ignore the response body. HEAD response may contain
1021   // Content-Length or Transfer-Encoding: chunked.  Some server send
1022   // 304 status code with nonzero Content-Length, but without response
1023   // body. See
1024   // https://tools.ietf.org/html/rfc7230#section-3.3
1025
1026   // TODO It seems that the cases other than HEAD are handled by
1027   // llhttp.  Need test.
1028   return !http2::expect_response_body(req.method, resp.http_status);
1029 }
1030 } // namespace
1031
1032 namespace {
1033 int ensure_header_field_buffer(const Downstream *downstream,
1034                                const HttpConfig &httpconf, size_t len) {
1035   auto &resp = downstream->response();
1036
1037   if (resp.fs.buffer_size() + len > httpconf.response_header_field_buffer) {
1038     if (LOG_ENABLED(INFO)) {
1039       DLOG(INFO, downstream) << "Too large header header field size="
1040                              << resp.fs.buffer_size() + len;
1041     }
1042     return -1;
1043   }
1044
1045   return 0;
1046 }
1047 } // namespace
1048
1049 namespace {
1050 int ensure_max_header_fields(const Downstream *downstream,
1051                              const HttpConfig &httpconf) {
1052   auto &resp = downstream->response();
1053
1054   if (resp.fs.num_fields() >= httpconf.max_response_header_fields) {
1055     if (LOG_ENABLED(INFO)) {
1056       DLOG(INFO, downstream)
1057           << "Too many header field num=" << resp.fs.num_fields() + 1;
1058     }
1059     return -1;
1060   }
1061
1062   return 0;
1063 }
1064 } // namespace
1065
1066 namespace {
1067 int htp_hdr_keycb(llhttp_t *htp, const char *data, size_t len) {
1068   auto downstream = static_cast<Downstream *>(htp->data);
1069   auto &resp = downstream->response();
1070   auto &httpconf = get_config()->http;
1071
1072   if (ensure_header_field_buffer(downstream, httpconf, len) != 0) {
1073     return -1;
1074   }
1075
1076   if (downstream->get_response_state() == DownstreamState::INITIAL) {
1077     if (resp.fs.header_key_prev()) {
1078       resp.fs.append_last_header_key(data, len);
1079     } else {
1080       if (ensure_max_header_fields(downstream, httpconf) != 0) {
1081         return -1;
1082       }
1083       resp.fs.alloc_add_header_name(StringRef{data, len});
1084     }
1085   } else {
1086     // trailer part
1087     if (resp.fs.trailer_key_prev()) {
1088       resp.fs.append_last_trailer_key(data, len);
1089     } else {
1090       if (ensure_max_header_fields(downstream, httpconf) != 0) {
1091         // Could not ignore this trailer field easily, since we may
1092         // get its value in htp_hdr_valcb, and it will be added to
1093         // wrong place or crash if trailer fields are currently empty.
1094         return -1;
1095       }
1096       resp.fs.alloc_add_trailer_name(StringRef{data, len});
1097     }
1098   }
1099   return 0;
1100 }
1101 } // namespace
1102
1103 namespace {
1104 int htp_hdr_valcb(llhttp_t *htp, const char *data, size_t len) {
1105   auto downstream = static_cast<Downstream *>(htp->data);
1106   auto &resp = downstream->response();
1107   auto &httpconf = get_config()->http;
1108
1109   if (ensure_header_field_buffer(downstream, httpconf, len) != 0) {
1110     return -1;
1111   }
1112
1113   if (downstream->get_response_state() == DownstreamState::INITIAL) {
1114     resp.fs.append_last_header_value(data, len);
1115   } else {
1116     resp.fs.append_last_trailer_value(data, len);
1117   }
1118   return 0;
1119 }
1120 } // namespace
1121
1122 namespace {
1123 int htp_bodycb(llhttp_t *htp, const char *data, size_t len) {
1124   auto downstream = static_cast<Downstream *>(htp->data);
1125   auto &resp = downstream->response();
1126
1127   resp.recv_body_length += len;
1128
1129   return downstream->get_upstream()->on_downstream_body(
1130       downstream, reinterpret_cast<const uint8_t *>(data), len, true);
1131 }
1132 } // namespace
1133
1134 namespace {
1135 int htp_msg_completecb(llhttp_t *htp) {
1136   auto downstream = static_cast<Downstream *>(htp->data);
1137
1138   // llhttp does not treat "200 connection established" response
1139   // against CONNECT request, and in that case, this function is not
1140   // called.  But if HTTP Upgrade is made (e.g., WebSocket), this
1141   // function is called, and llhttp_execute() returns just after that.
1142   if (downstream->get_upgraded()) {
1143     return 0;
1144   }
1145
1146   if (downstream->get_non_final_response()) {
1147     downstream->reset_response();
1148
1149     return 0;
1150   }
1151
1152   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1153   // Block reading another response message from (broken?)
1154   // server. This callback is not called if the connection is
1155   // tunneled.
1156   downstream->pause_read(SHRPX_MSG_BLOCK);
1157   return downstream->get_upstream()->on_downstream_body_complete(downstream);
1158 }
1159 } // namespace
1160
1161 int HttpDownstreamConnection::write_first() {
1162   int rv;
1163
1164   process_blocked_request_buf();
1165
1166   if (conn_.tls.ssl) {
1167     rv = write_tls();
1168   } else {
1169     rv = write_clear();
1170   }
1171
1172   if (rv != 0) {
1173     return SHRPX_ERR_RETRY;
1174   }
1175
1176   if (conn_.tls.ssl) {
1177     on_write_ = &HttpDownstreamConnection::write_tls;
1178   } else {
1179     on_write_ = &HttpDownstreamConnection::write_clear;
1180   }
1181
1182   first_write_done_ = true;
1183   downstream_->set_request_header_sent(true);
1184
1185   auto buf = downstream_->get_blocked_request_buf();
1186   buf->reset();
1187
1188   // upstream->resume_read() might be called in
1189   // write_tls()/write_clear(), but before blocked_request_buf_ is
1190   // reset.  So upstream read might still be blocked.  Let's do it
1191   // again here.
1192   auto input = downstream_->get_request_buf();
1193   if (input->rleft() == 0) {
1194     auto upstream = downstream_->get_upstream();
1195     auto &req = downstream_->request();
1196
1197     upstream->resume_read(SHRPX_NO_BUFFER, downstream_,
1198                           req.unconsumed_body_length);
1199   }
1200
1201   return 0;
1202 }
1203
1204 int HttpDownstreamConnection::read_clear() {
1205   conn_.last_read = ev_now(conn_.loop);
1206
1207   std::array<uint8_t, 16_k> buf;
1208   int rv;
1209
1210   for (;;) {
1211     auto nread = conn_.read_clear(buf.data(), buf.size());
1212     if (nread == 0) {
1213       return 0;
1214     }
1215
1216     if (nread < 0) {
1217       return nread;
1218     }
1219
1220     rv = process_input(buf.data(), nread);
1221     if (rv != 0) {
1222       return rv;
1223     }
1224
1225     if (!ev_is_active(&conn_.rev)) {
1226       return 0;
1227     }
1228   }
1229 }
1230
1231 int HttpDownstreamConnection::write_clear() {
1232   conn_.last_read = ev_now(conn_.loop);
1233
1234   auto upstream = downstream_->get_upstream();
1235   auto input = downstream_->get_request_buf();
1236
1237   std::array<struct iovec, MAX_WR_IOVCNT> iov;
1238
1239   while (input->rleft() > 0) {
1240     auto iovcnt = input->riovec(iov.data(), iov.size());
1241
1242     auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
1243
1244     if (nwrite == 0) {
1245       return 0;
1246     }
1247
1248     if (nwrite < 0) {
1249       if (!first_write_done_) {
1250         return nwrite;
1251       }
1252       // We may have pending data in receive buffer which may contain
1253       // part of response body.  So keep reading.  Invoke read event
1254       // to get read(2) error just in case.
1255       ev_feed_event(conn_.loop, &conn_.rev, EV_READ);
1256       on_write_ = &HttpDownstreamConnection::noop;
1257       reusable_ = false;
1258       break;
1259     }
1260
1261     input->drain(nwrite);
1262   }
1263
1264   conn_.wlimit.stopw();
1265   ev_timer_stop(conn_.loop, &conn_.wt);
1266
1267   if (input->rleft() == 0) {
1268     auto &req = downstream_->request();
1269
1270     upstream->resume_read(SHRPX_NO_BUFFER, downstream_,
1271                           req.unconsumed_body_length);
1272   }
1273
1274   return 0;
1275 }
1276
1277 int HttpDownstreamConnection::tls_handshake() {
1278   ERR_clear_error();
1279
1280   conn_.last_read = ev_now(conn_.loop);
1281
1282   auto rv = conn_.tls_handshake();
1283   if (rv == SHRPX_ERR_INPROGRESS) {
1284     return 0;
1285   }
1286
1287   if (rv < 0) {
1288     downstream_failure(addr_, raddr_);
1289
1290     return rv;
1291   }
1292
1293   if (LOG_ENABLED(INFO)) {
1294     DCLOG(INFO, this) << "SSL/TLS handshake completed";
1295   }
1296
1297   if (!get_config()->tls.insecure &&
1298       tls::check_cert(conn_.tls.ssl, addr_, raddr_) != 0) {
1299     downstream_failure(addr_, raddr_);
1300
1301     return -1;
1302   }
1303
1304   auto &connect_blocker = addr_->connect_blocker;
1305
1306   signal_write_ = &HttpDownstreamConnection::actual_signal_write;
1307
1308   connect_blocker->on_success();
1309
1310   ev_set_cb(&conn_.rt, timeoutcb);
1311   ev_set_cb(&conn_.wt, timeoutcb);
1312
1313   on_read_ = &HttpDownstreamConnection::read_tls;
1314   on_write_ = &HttpDownstreamConnection::write_first;
1315
1316   // TODO Check negotiated ALPN
1317
1318   return on_write();
1319 }
1320
1321 int HttpDownstreamConnection::read_tls() {
1322   conn_.last_read = ev_now(conn_.loop);
1323
1324   ERR_clear_error();
1325
1326   std::array<uint8_t, 16_k> buf;
1327   int rv;
1328
1329   for (;;) {
1330     auto nread = conn_.read_tls(buf.data(), buf.size());
1331     if (nread == 0) {
1332       return 0;
1333     }
1334
1335     if (nread < 0) {
1336       return nread;
1337     }
1338
1339     rv = process_input(buf.data(), nread);
1340     if (rv != 0) {
1341       return rv;
1342     }
1343
1344     if (!ev_is_active(&conn_.rev)) {
1345       return 0;
1346     }
1347   }
1348 }
1349
1350 int HttpDownstreamConnection::write_tls() {
1351   conn_.last_read = ev_now(conn_.loop);
1352
1353   ERR_clear_error();
1354
1355   auto upstream = downstream_->get_upstream();
1356   auto input = downstream_->get_request_buf();
1357
1358   struct iovec iov;
1359
1360   while (input->rleft() > 0) {
1361     auto iovcnt = input->riovec(&iov, 1);
1362     if (iovcnt != 1) {
1363       assert(0);
1364       return -1;
1365     }
1366     auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
1367
1368     if (nwrite == 0) {
1369       return 0;
1370     }
1371
1372     if (nwrite < 0) {
1373       if (!first_write_done_) {
1374         return nwrite;
1375       }
1376       // We may have pending data in receive buffer which may contain
1377       // part of response body.  So keep reading.  Invoke read event
1378       // to get read(2) error just in case.
1379       ev_feed_event(conn_.loop, &conn_.rev, EV_READ);
1380       on_write_ = &HttpDownstreamConnection::noop;
1381       reusable_ = false;
1382       break;
1383     }
1384
1385     input->drain(nwrite);
1386   }
1387
1388   conn_.wlimit.stopw();
1389   ev_timer_stop(conn_.loop, &conn_.wt);
1390
1391   if (input->rleft() == 0) {
1392     auto &req = downstream_->request();
1393
1394     upstream->resume_read(SHRPX_NO_BUFFER, downstream_,
1395                           req.unconsumed_body_length);
1396   }
1397
1398   return 0;
1399 }
1400
1401 int HttpDownstreamConnection::process_input(const uint8_t *data,
1402                                             size_t datalen) {
1403   int rv;
1404
1405   if (downstream_->get_upgraded()) {
1406     // For upgraded connection, just pass data to the upstream.
1407     rv = downstream_->get_upstream()->on_downstream_body(downstream_, data,
1408                                                          datalen, true);
1409     if (rv != 0) {
1410       return rv;
1411     }
1412
1413     if (downstream_->response_buf_full()) {
1414       downstream_->pause_read(SHRPX_NO_BUFFER);
1415       return 0;
1416     }
1417
1418     return 0;
1419   }
1420
1421   auto htperr = llhttp_execute(&response_htp_,
1422                                reinterpret_cast<const char *>(data), datalen);
1423   auto nproc =
1424       htperr == HPE_OK
1425           ? datalen
1426           : static_cast<size_t>(reinterpret_cast<const uint8_t *>(
1427                                     llhttp_get_error_pos(&response_htp_)) -
1428                                 data);
1429
1430   if (htperr != HPE_OK &&
1431       (!downstream_->get_upgraded() || htperr != HPE_PAUSED_UPGRADE)) {
1432     // Handling early return (in other words, response was hijacked by
1433     // mruby scripting).
1434     if (downstream_->get_response_state() == DownstreamState::MSG_COMPLETE) {
1435       return SHRPX_ERR_DCONN_CANCELED;
1436     }
1437
1438     if (LOG_ENABLED(INFO)) {
1439       DCLOG(INFO, this) << "HTTP parser failure: "
1440                         << "(" << llhttp_errno_name(htperr) << ") "
1441                         << llhttp_get_error_reason(&response_htp_);
1442     }
1443
1444     return -1;
1445   }
1446
1447   if (downstream_->get_upgraded()) {
1448     if (nproc < datalen) {
1449       // Data from data + nproc are for upgraded protocol.
1450       rv = downstream_->get_upstream()->on_downstream_body(
1451           downstream_, data + nproc, datalen - nproc, true);
1452       if (rv != 0) {
1453         return rv;
1454       }
1455
1456       if (downstream_->response_buf_full()) {
1457         downstream_->pause_read(SHRPX_NO_BUFFER);
1458         return 0;
1459       }
1460     }
1461     return 0;
1462   }
1463
1464   if (downstream_->response_buf_full()) {
1465     downstream_->pause_read(SHRPX_NO_BUFFER);
1466     return 0;
1467   }
1468
1469   return 0;
1470 }
1471
1472 int HttpDownstreamConnection::connected() {
1473   auto &connect_blocker = addr_->connect_blocker;
1474
1475   auto sock_error = util::get_socket_error(conn_.fd);
1476   if (sock_error != 0) {
1477     conn_.wlimit.stopw();
1478
1479     DCLOG(WARN, this) << "Backend connect failed; addr="
1480                       << util::to_numeric_addr(raddr_)
1481                       << ": errno=" << sock_error;
1482
1483     downstream_failure(addr_, raddr_);
1484
1485     return -1;
1486   }
1487
1488   if (LOG_ENABLED(INFO)) {
1489     DCLOG(INFO, this) << "Connected to downstream host";
1490   }
1491
1492   // Reset timeout for write.  Previously, we set timeout for connect.
1493   conn_.wt.repeat = group_->shared_addr->timeout.write;
1494   ev_timer_again(conn_.loop, &conn_.wt);
1495
1496   conn_.rlimit.startw();
1497   conn_.again_rt();
1498
1499   ev_set_cb(&conn_.wev, writecb);
1500
1501   if (conn_.tls.ssl) {
1502     on_read_ = &HttpDownstreamConnection::tls_handshake;
1503     on_write_ = &HttpDownstreamConnection::tls_handshake;
1504
1505     return 0;
1506   }
1507
1508   signal_write_ = &HttpDownstreamConnection::actual_signal_write;
1509
1510   connect_blocker->on_success();
1511
1512   ev_set_cb(&conn_.rt, timeoutcb);
1513   ev_set_cb(&conn_.wt, timeoutcb);
1514
1515   on_read_ = &HttpDownstreamConnection::read_clear;
1516   on_write_ = &HttpDownstreamConnection::write_first;
1517
1518   return 0;
1519 }
1520
1521 int HttpDownstreamConnection::on_read() { return on_read_(*this); }
1522
1523 int HttpDownstreamConnection::on_write() { return on_write_(*this); }
1524
1525 void HttpDownstreamConnection::on_upstream_change(Upstream *upstream) {}
1526
1527 void HttpDownstreamConnection::signal_write() { signal_write_(*this); }
1528
1529 int HttpDownstreamConnection::actual_signal_write() {
1530   ev_feed_event(conn_.loop, &conn_.wev, EV_WRITE);
1531   return 0;
1532 }
1533
1534 int HttpDownstreamConnection::noop() { return 0; }
1535
1536 const std::shared_ptr<DownstreamAddrGroup> &
1537 HttpDownstreamConnection::get_downstream_addr_group() const {
1538   return group_;
1539 }
1540
1541 DownstreamAddr *HttpDownstreamConnection::get_addr() const { return addr_; }
1542
1543 bool HttpDownstreamConnection::poolable() const {
1544   return !group_->retired && reusable_;
1545 }
1546
1547 const Address *HttpDownstreamConnection::get_raddr() const { return raddr_; }
1548
1549 } // namespace shrpx