Imported Upstream version 1.0.0
[platform/upstream/nghttp2.git] / src / shrpx_http_downstream_connection.cc
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http_downstream_connection.h"
26
27 #include "shrpx_client_handler.h"
28 #include "shrpx_upstream.h"
29 #include "shrpx_downstream.h"
30 #include "shrpx_config.h"
31 #include "shrpx_error.h"
32 #include "shrpx_http.h"
33 #include "shrpx_log_config.h"
34 #include "shrpx_connect_blocker.h"
35 #include "shrpx_downstream_connection_pool.h"
36 #include "shrpx_worker.h"
37 #include "http2.h"
38 #include "util.h"
39
40 using namespace nghttp2;
41
42 namespace shrpx {
43
44 namespace {
45 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
46   auto conn = static_cast<Connection *>(w->data);
47   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
48
49   if (LOG_ENABLED(INFO)) {
50     DCLOG(INFO, dconn) << "Time out";
51   }
52
53   auto downstream = dconn->get_downstream();
54   auto upstream = downstream->get_upstream();
55   auto handler = upstream->get_client_handler();
56
57   // Do this so that dconn is not pooled
58   downstream->set_response_connection_close(true);
59
60   if (upstream->downstream_error(dconn, Downstream::EVENT_TIMEOUT) != 0) {
61     delete handler;
62   }
63 }
64 } // namespace
65
66 namespace {
67 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
68   auto conn = static_cast<Connection *>(w->data);
69   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
70   auto downstream = dconn->get_downstream();
71   auto upstream = downstream->get_upstream();
72   auto handler = upstream->get_client_handler();
73
74   if (upstream->downstream_read(dconn) != 0) {
75     delete handler;
76   }
77 }
78 } // namespace
79
80 namespace {
81 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
82   auto conn = static_cast<Connection *>(w->data);
83   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
84   auto downstream = dconn->get_downstream();
85   auto upstream = downstream->get_upstream();
86   auto handler = upstream->get_client_handler();
87
88   if (upstream->downstream_write(dconn) != 0) {
89     delete handler;
90   }
91 }
92 } // namespace
93
94 namespace {
95 void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
96   auto conn = static_cast<Connection *>(w->data);
97   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
98   auto downstream = dconn->get_downstream();
99   auto upstream = downstream->get_upstream();
100   auto handler = upstream->get_client_handler();
101   if (dconn->on_connect() != 0) {
102     if (upstream->on_downstream_abort_request(downstream, 503) != 0) {
103       delete handler;
104     }
105     return;
106   }
107   writecb(loop, w, revents);
108 }
109 } // namespace
110
111 HttpDownstreamConnection::HttpDownstreamConnection(
112     DownstreamConnectionPool *dconn_pool, struct ev_loop *loop)
113     : DownstreamConnection(dconn_pool),
114       conn_(loop, -1, nullptr, get_config()->downstream_write_timeout,
115             get_config()->downstream_read_timeout, 0, 0, 0, 0, connectcb,
116             readcb, timeoutcb, this),
117       ioctrl_(&conn_.rlimit), response_htp_{0}, addr_idx_(0),
118       connected_(false) {}
119
120 HttpDownstreamConnection::~HttpDownstreamConnection() {
121   // Downstream and DownstreamConnection may be deleted
122   // asynchronously.
123   if (downstream_) {
124     downstream_->release_downstream_connection();
125   }
126 }
127
128 int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
129   if (LOG_ENABLED(INFO)) {
130     DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream;
131   }
132
133   if (conn_.fd == -1) {
134     auto connect_blocker = client_handler_->get_connect_blocker();
135
136     if (connect_blocker->blocked()) {
137       if (LOG_ENABLED(INFO)) {
138         DCLOG(INFO, this)
139             << "Downstream connection was blocked by connect_blocker";
140       }
141       return -1;
142     }
143
144     auto worker = client_handler_->get_worker();
145     auto worker_stat = worker->get_worker_stat();
146     auto end = worker_stat->next_downstream;
147     for (;;) {
148       auto i = worker_stat->next_downstream;
149       ++worker_stat->next_downstream;
150       worker_stat->next_downstream %= get_config()->downstream_addrs.size();
151
152       conn_.fd = util::create_nonblock_socket(
153           get_config()->downstream_addrs[i].addr.storage.ss_family);
154
155       if (conn_.fd == -1) {
156         auto error = errno;
157         DCLOG(WARN, this) << "socket() failed; errno=" << error;
158
159         connect_blocker->on_failure();
160
161         return SHRPX_ERR_NETWORK;
162       }
163
164       int rv;
165       rv = connect(conn_.fd, &get_config()->downstream_addrs[i].addr.sa,
166                    get_config()->downstream_addrs[i].addrlen);
167       if (rv != 0 && errno != EINPROGRESS) {
168         auto error = errno;
169         DCLOG(WARN, this) << "connect() failed; errno=" << error;
170
171         connect_blocker->on_failure();
172         close(conn_.fd);
173         conn_.fd = -1;
174
175         if (end == worker_stat->next_downstream) {
176           return SHRPX_ERR_NETWORK;
177         }
178
179         // Try again with the next downstream server
180         continue;
181       }
182
183       if (LOG_ENABLED(INFO)) {
184         DCLOG(INFO, this) << "Connecting to downstream server";
185       }
186
187       addr_idx_ = i;
188
189       ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
190       ev_io_set(&conn_.rev, conn_.fd, EV_READ);
191
192       conn_.wlimit.startw();
193
194       break;
195     }
196   }
197
198   downstream_ = downstream;
199
200   http_parser_init(&response_htp_, HTTP_RESPONSE);
201   response_htp_.data = downstream_;
202
203   ev_set_cb(&conn_.rev, readcb);
204
205   conn_.rt.repeat = get_config()->downstream_read_timeout;
206   ev_timer_again(conn_.loop, &conn_.rt);
207   // TODO we should have timeout for connection establishment
208   ev_timer_again(conn_.loop, &conn_.wt);
209
210   return 0;
211 }
212
213 int HttpDownstreamConnection::push_request_headers() {
214   const char *authority = nullptr, *host = nullptr;
215   auto downstream_hostport =
216       get_config()->downstream_addrs[addr_idx_].hostport.get();
217
218   if (!get_config()->no_host_rewrite && !get_config()->http2_proxy &&
219       !get_config()->client_proxy &&
220       downstream_->get_request_method() != "CONNECT") {
221     if (!downstream_->get_request_http2_authority().empty()) {
222       authority = downstream_hostport;
223     }
224     if (downstream_->get_request_header(http2::HD_HOST)) {
225       host = downstream_hostport;
226     }
227   } else {
228     if (!downstream_->get_request_http2_authority().empty()) {
229       authority = downstream_->get_request_http2_authority().c_str();
230     }
231     auto h = downstream_->get_request_header(http2::HD_HOST);
232     if (h) {
233       host = h->value.c_str();
234     }
235   }
236
237   if (!authority && !host) {
238     // upstream is HTTP/1.0.  We use backend server's host
239     // nonetheless.
240     host = downstream_hostport;
241   }
242
243   if (authority) {
244     downstream_->set_request_downstream_host(authority);
245   } else {
246     downstream_->set_request_downstream_host(host);
247   }
248
249   downstream_->assemble_request_cookie();
250
251   // Assume that method and request path do not contain \r\n.
252   std::string hdrs = downstream_->get_request_method();
253   hdrs += ' ';
254   if (downstream_->get_request_method() == "CONNECT") {
255     if (authority) {
256       hdrs += authority;
257     } else {
258       hdrs += downstream_->get_request_path();
259     }
260   } else if (get_config()->http2_proxy || get_config()->client_proxy) {
261     // Construct absolute-form request target because we are going to
262     // send a request to a HTTP/1 proxy.
263     if (downstream_->get_request_http2_scheme().empty()) {
264       // this comes from HTTP/1 upstream, use http scheme.  We don't
265       // support https forward link yet.
266       hdrs += "http://";
267     } else {
268       hdrs += downstream_->get_request_http2_scheme();
269       hdrs += "://";
270     }
271     if (authority) {
272       hdrs += authority;
273     } else {
274       hdrs += host;
275     }
276
277     // Server-wide OPTIONS takes following form in proxy request:
278     //
279     // OPTIONS http://example.org HTTP/1.1
280     //
281     // Notice that no slash after authority. See
282     // http://tools.ietf.org/html/rfc7230#section-5.3.4
283     if (downstream_->get_request_path() != "*") {
284       hdrs += downstream_->get_request_path();
285     }
286   } else {
287     // No proxy case.
288     hdrs += downstream_->get_request_path();
289   }
290   hdrs += " HTTP/1.1\r\nHost: ";
291   if (authority) {
292     hdrs += authority;
293   } else {
294     hdrs += host;
295   }
296   hdrs += "\r\n";
297
298   http2::build_http1_headers_from_headers(hdrs,
299                                           downstream_->get_request_headers());
300
301   if (!downstream_->get_assembled_request_cookie().empty()) {
302     hdrs += "Cookie: ";
303     hdrs += downstream_->get_assembled_request_cookie();
304     hdrs += "\r\n";
305   }
306
307   if (downstream_->get_request_method() != "CONNECT" &&
308       downstream_->get_request_http2_expect_body() &&
309       !downstream_->get_request_header(http2::HD_CONTENT_LENGTH)) {
310
311     downstream_->set_chunked_request(true);
312     hdrs += "Transfer-Encoding: chunked\r\n";
313   }
314
315   if (downstream_->get_request_connection_close()) {
316     hdrs += "Connection: close\r\n";
317   }
318   auto xff = downstream_->get_request_header(http2::HD_X_FORWARDED_FOR);
319   if (get_config()->add_x_forwarded_for) {
320     hdrs += "X-Forwarded-For: ";
321     if (xff && !get_config()->strip_incoming_x_forwarded_for) {
322       hdrs += (*xff).value;
323       hdrs += ", ";
324     }
325     hdrs += client_handler_->get_ipaddr();
326     hdrs += "\r\n";
327   } else if (xff && !get_config()->strip_incoming_x_forwarded_for) {
328     hdrs += "X-Forwarded-For: ";
329     hdrs += (*xff).value;
330     hdrs += "\r\n";
331   }
332   if (!get_config()->http2_proxy && !get_config()->client_proxy &&
333       downstream_->get_request_method() != "CONNECT") {
334     hdrs += "X-Forwarded-Proto: ";
335     if (!downstream_->get_request_http2_scheme().empty()) {
336       hdrs += downstream_->get_request_http2_scheme();
337       hdrs += "\r\n";
338     } else if (client_handler_->get_ssl()) {
339       hdrs += "https\r\n";
340     } else {
341       hdrs += "http\r\n";
342     }
343   }
344   auto expect = downstream_->get_request_header(http2::HD_EXPECT);
345   if (expect && !util::strifind((*expect).value.c_str(), "100-continue")) {
346     hdrs += "Expect: ";
347     hdrs += (*expect).value;
348     hdrs += "\r\n";
349   }
350   auto via = downstream_->get_request_header(http2::HD_VIA);
351   if (get_config()->no_via) {
352     if (via) {
353       hdrs += "Via: ";
354       hdrs += (*via).value;
355       hdrs += "\r\n";
356     }
357   } else {
358     hdrs += "Via: ";
359     if (via) {
360       hdrs += (*via).value;
361       hdrs += ", ";
362     }
363     hdrs += http::create_via_header_value(downstream_->get_request_major(),
364                                           downstream_->get_request_minor());
365     hdrs += "\r\n";
366   }
367
368   hdrs += "\r\n";
369   if (LOG_ENABLED(INFO)) {
370     const char *hdrp;
371     std::string nhdrs;
372     if (log_config()->errorlog_tty) {
373       nhdrs = http::colorizeHeaders(hdrs.c_str());
374       hdrp = nhdrs.c_str();
375     } else {
376       hdrp = hdrs.c_str();
377     }
378     DCLOG(INFO, this) << "HTTP request headers. stream_id="
379                       << downstream_->get_stream_id() << "\n" << hdrp;
380   }
381   auto output = downstream_->get_request_buf();
382   output->append(hdrs.c_str(), hdrs.size());
383
384   signal_write();
385
386   return 0;
387 }
388
389 int HttpDownstreamConnection::push_upload_data_chunk(const uint8_t *data,
390                                                      size_t datalen) {
391   auto chunked = downstream_->get_chunked_request();
392   auto output = downstream_->get_request_buf();
393
394   if (chunked) {
395     auto chunk_size_hex = util::utox(datalen);
396     output->append(chunk_size_hex.c_str(), chunk_size_hex.size());
397     output->append("\r\n");
398   }
399
400   output->append(data, datalen);
401
402   if (chunked) {
403     output->append("\r\n");
404   }
405
406   signal_write();
407
408   return 0;
409 }
410
411 int HttpDownstreamConnection::end_upload_data() {
412   if (!downstream_->get_chunked_request()) {
413     return 0;
414   }
415
416   auto output = downstream_->get_request_buf();
417   auto &trailers = downstream_->get_request_trailers();
418   if (trailers.empty()) {
419     output->append("0\r\n\r\n");
420   } else {
421     output->append("0\r\n");
422     std::string trailer_part;
423     http2::build_http1_headers_from_headers(trailer_part, trailers);
424     output->append(trailer_part.c_str(), trailer_part.size());
425     output->append("\r\n");
426   }
427
428   signal_write();
429
430   return 0;
431 }
432
433 namespace {
434 void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) {
435   auto conn = static_cast<Connection *>(w->data);
436   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
437   if (LOG_ENABLED(INFO)) {
438     DCLOG(INFO, dconn) << "Idle connection EOF";
439   }
440   auto dconn_pool = dconn->get_dconn_pool();
441   dconn_pool->remove_downstream_connection(dconn);
442   // dconn was deleted
443 }
444 } // namespace
445
446 namespace {
447 void idle_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
448   auto conn = static_cast<Connection *>(w->data);
449   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
450   if (LOG_ENABLED(INFO)) {
451     DCLOG(INFO, dconn) << "Idle connection timeout";
452   }
453   auto dconn_pool = dconn->get_dconn_pool();
454   dconn_pool->remove_downstream_connection(dconn);
455   // dconn was deleted
456 }
457 } // namespace
458
459 void HttpDownstreamConnection::detach_downstream(Downstream *downstream) {
460   if (LOG_ENABLED(INFO)) {
461     DCLOG(INFO, this) << "Detaching from DOWNSTREAM:" << downstream;
462   }
463   downstream_ = nullptr;
464   ioctrl_.force_resume_read();
465
466   conn_.rlimit.startw();
467   conn_.wlimit.stopw();
468
469   ev_set_cb(&conn_.rev, idle_readcb);
470
471   ev_timer_stop(conn_.loop, &conn_.wt);
472
473   conn_.rt.repeat = get_config()->downstream_idle_read_timeout;
474   ev_set_cb(&conn_.rt, idle_timeoutcb);
475   ev_timer_again(conn_.loop, &conn_.rt);
476 }
477
478 void HttpDownstreamConnection::pause_read(IOCtrlReason reason) {
479   ioctrl_.pause_read(reason);
480 }
481
482 int HttpDownstreamConnection::resume_read(IOCtrlReason reason,
483                                           size_t consumed) {
484   if (!downstream_->response_buf_full()) {
485     ioctrl_.resume_read(reason);
486   }
487
488   return 0;
489 }
490
491 void HttpDownstreamConnection::force_resume_read() {
492   ioctrl_.force_resume_read();
493 }
494
495 namespace {
496 int htp_msg_begincb(http_parser *htp) {
497   auto downstream = static_cast<Downstream *>(htp->data);
498
499   if (downstream->get_response_state() != Downstream::INITIAL) {
500     return -1;
501   }
502
503   return 0;
504 }
505 } // namespace
506
507 namespace {
508 int htp_hdrs_completecb(http_parser *htp) {
509   auto downstream = static_cast<Downstream *>(htp->data);
510   auto upstream = downstream->get_upstream();
511   int rv;
512
513   downstream->set_response_http_status(htp->status_code);
514   downstream->set_response_major(htp->http_major);
515   downstream->set_response_minor(htp->http_minor);
516
517   if (downstream->index_response_headers() != 0) {
518     downstream->set_response_state(Downstream::MSG_BAD_HEADER);
519     return -1;
520   }
521
522   if (downstream->get_non_final_response()) {
523     // Reset content-length because we reuse same Downstream for the
524     // next response.
525     downstream->set_response_content_length(-1);
526     // For non-final response code, we just call
527     // on_downstream_header_complete() without changing response
528     // state.
529     rv = upstream->on_downstream_header_complete(downstream);
530
531     if (rv != 0) {
532       return -1;
533     }
534
535     return 0;
536   }
537
538   downstream->set_response_connection_close(!http_should_keep_alive(htp));
539   downstream->set_response_state(Downstream::HEADER_COMPLETE);
540   downstream->inspect_http1_response();
541   downstream->check_upgrade_fulfilled();
542   if (downstream->get_upgraded()) {
543     // content-length must be ignored for upgraded connection.
544     downstream->set_response_content_length(-1);
545     downstream->set_response_connection_close(true);
546     // transfer-encoding not applied to upgraded connection
547     downstream->set_chunked_response(false);
548   }
549   if (upstream->on_downstream_header_complete(downstream) != 0) {
550     return -1;
551   }
552
553   if (downstream->get_upgraded()) {
554     // Upgrade complete, read until EOF in both ends
555     if (upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0) != 0) {
556       return -1;
557     }
558     downstream->set_request_state(Downstream::HEADER_COMPLETE);
559     if (LOG_ENABLED(INFO)) {
560       LOG(INFO) << "HTTP upgrade success. stream_id="
561                 << downstream->get_stream_id();
562     }
563   }
564
565   unsigned int status = downstream->get_response_http_status();
566   // Ignore the response body. HEAD response may contain
567   // Content-Length or Transfer-Encoding: chunked.  Some server send
568   // 304 status code with nonzero Content-Length, but without response
569   // body. See
570   // https://tools.ietf.org/html/rfc7230#section-3.3
571
572   // TODO It seems that the cases other than HEAD are handled by
573   // http-parser.  Need test.
574   return downstream->get_request_method() == "HEAD" ||
575                  (100 <= status && status <= 199) || status == 204 ||
576                  status == 304
577              ? 1
578              : 0;
579 }
580 } // namespace
581
582 namespace {
583 int htp_hdr_keycb(http_parser *htp, const char *data, size_t len) {
584   auto downstream = static_cast<Downstream *>(htp->data);
585
586   if (downstream->get_response_headers_sum() + len >
587       get_config()->header_field_buffer) {
588     if (LOG_ENABLED(INFO)) {
589       DLOG(INFO, downstream) << "Too large header block size="
590                              << downstream->get_response_headers_sum() + len;
591     }
592     return -1;
593   }
594
595   if (downstream->get_response_state() == Downstream::INITIAL) {
596     if (downstream->get_response_header_key_prev()) {
597       downstream->append_last_response_header_key(data, len);
598     } else {
599       if (downstream->get_response_headers().size() >=
600           get_config()->max_header_fields) {
601         if (LOG_ENABLED(INFO)) {
602           DLOG(INFO, downstream)
603               << "Too many header field num="
604               << downstream->get_response_headers().size() + 1;
605         }
606         return -1;
607       }
608       downstream->add_response_header(std::string(data, len), "");
609     }
610   } else {
611     // trailer part
612     if (downstream->get_response_trailer_key_prev()) {
613       downstream->append_last_response_trailer_key(data, len);
614     } else {
615       if (downstream->get_response_headers().size() >=
616           get_config()->max_header_fields) {
617         if (LOG_ENABLED(INFO)) {
618           DLOG(INFO, downstream)
619               << "Too many header field num="
620               << downstream->get_response_headers().size() + 1;
621         }
622         return -1;
623       }
624       downstream->add_response_trailer(std::string(data, len), "");
625     }
626   }
627   return 0;
628 }
629 } // namespace
630
631 namespace {
632 int htp_hdr_valcb(http_parser *htp, const char *data, size_t len) {
633   auto downstream = static_cast<Downstream *>(htp->data);
634   if (downstream->get_response_headers_sum() + len >
635       get_config()->header_field_buffer) {
636     if (LOG_ENABLED(INFO)) {
637       DLOG(INFO, downstream) << "Too large header block size="
638                              << downstream->get_response_headers_sum() + len;
639     }
640     return -1;
641   }
642   if (downstream->get_response_state() == Downstream::INITIAL) {
643     if (downstream->get_response_header_key_prev()) {
644       downstream->set_last_response_header_value(data, len);
645     } else {
646       downstream->append_last_response_header_value(data, len);
647     }
648   } else {
649     if (downstream->get_response_trailer_key_prev()) {
650       downstream->set_last_response_trailer_value(data, len);
651     } else {
652       downstream->append_last_response_trailer_value(data, len);
653     }
654   }
655   return 0;
656 }
657 } // namespace
658
659 namespace {
660 int htp_bodycb(http_parser *htp, const char *data, size_t len) {
661   auto downstream = static_cast<Downstream *>(htp->data);
662
663   downstream->add_response_bodylen(len);
664
665   return downstream->get_upstream()->on_downstream_body(
666       downstream, reinterpret_cast<const uint8_t *>(data), len, true);
667 }
668 } // namespace
669
670 namespace {
671 int htp_msg_completecb(http_parser *htp) {
672   auto downstream = static_cast<Downstream *>(htp->data);
673
674   if (downstream->get_non_final_response()) {
675     downstream->reset_response();
676
677     return 0;
678   }
679
680   downstream->set_response_state(Downstream::MSG_COMPLETE);
681   // Block reading another response message from (broken?)
682   // server. This callback is not called if the connection is
683   // tunneled.
684   downstream->pause_read(SHRPX_MSG_BLOCK);
685   return downstream->get_upstream()->on_downstream_body_complete(downstream);
686 }
687 } // namespace
688
689 namespace {
690 http_parser_settings htp_hooks = {
691     htp_msg_begincb,     // http_cb on_message_begin;
692     nullptr,             // http_data_cb on_url;
693     nullptr,             // http_data_cb on_status;
694     htp_hdr_keycb,       // http_data_cb on_header_field;
695     htp_hdr_valcb,       // http_data_cb on_header_value;
696     htp_hdrs_completecb, // http_cb      on_headers_complete;
697     htp_bodycb,          // http_data_cb on_body;
698     htp_msg_completecb   // http_cb      on_message_complete;
699 };
700 } // namespace
701
702 int HttpDownstreamConnection::on_read() {
703   if (!connected_) {
704     return 0;
705   }
706
707   ev_timer_again(conn_.loop, &conn_.rt);
708   std::array<uint8_t, 8192> buf;
709   int rv;
710
711   if (downstream_->get_upgraded()) {
712     // For upgraded connection, just pass data to the upstream.
713     for (;;) {
714       auto nread = conn_.read_clear(buf.data(), buf.size());
715
716       if (nread == 0) {
717         return 0;
718       }
719
720       if (nread < 0) {
721         return nread;
722       }
723
724       rv = downstream_->get_upstream()->on_downstream_body(
725           downstream_, buf.data(), nread, true);
726       if (rv != 0) {
727         return rv;
728       }
729
730       if (downstream_->response_buf_full()) {
731         downstream_->pause_read(SHRPX_NO_BUFFER);
732         return 0;
733       }
734     }
735   }
736
737   for (;;) {
738     auto nread = conn_.read_clear(buf.data(), buf.size());
739
740     if (nread == 0) {
741       return 0;
742     }
743
744     if (nread < 0) {
745       return nread;
746     }
747
748     auto nproc =
749         http_parser_execute(&response_htp_, &htp_hooks,
750                             reinterpret_cast<char *>(buf.data()), nread);
751
752     auto htperr = HTTP_PARSER_ERRNO(&response_htp_);
753
754     if (htperr != HPE_OK) {
755       if (LOG_ENABLED(INFO)) {
756         DCLOG(INFO, this) << "HTTP parser failure: "
757                           << "(" << http_errno_name(htperr) << ") "
758                           << http_errno_description(htperr);
759       }
760
761       return -1;
762     }
763
764     if (nproc != static_cast<size_t>(nread)) {
765       if (LOG_ENABLED(INFO)) {
766         DCLOG(INFO, this) << "nproc != nread";
767       }
768       return -1;
769     }
770
771     if (downstream_->response_buf_full()) {
772       downstream_->pause_read(SHRPX_NO_BUFFER);
773       return 0;
774     }
775   }
776 }
777
778 int HttpDownstreamConnection::on_write() {
779   if (!connected_) {
780     return 0;
781   }
782
783   ev_timer_again(conn_.loop, &conn_.rt);
784
785   auto upstream = downstream_->get_upstream();
786   auto input = downstream_->get_request_buf();
787
788   std::array<struct iovec, MAX_WR_IOVCNT> iov;
789
790   while (input->rleft() > 0) {
791     auto iovcnt = input->riovec(iov.data(), iov.size());
792
793     auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
794
795     if (nwrite == 0) {
796       return 0;
797     }
798
799     if (nwrite < 0) {
800       return nwrite;
801     }
802
803     input->drain(nwrite);
804   }
805
806   conn_.wlimit.stopw();
807   ev_timer_stop(conn_.loop, &conn_.wt);
808
809   if (input->rleft() == 0) {
810     upstream->resume_read(SHRPX_NO_BUFFER, downstream_,
811                           downstream_->get_request_datalen());
812   }
813
814   return 0;
815 }
816
817 int HttpDownstreamConnection::on_connect() {
818   auto connect_blocker = client_handler_->get_connect_blocker();
819
820   if (!util::check_socket_connected(conn_.fd)) {
821     conn_.wlimit.stopw();
822
823     if (LOG_ENABLED(INFO)) {
824       DLOG(INFO, this) << "downstream connect failed";
825     }
826
827     return -1;
828   }
829
830   connected_ = true;
831
832   connect_blocker->on_success();
833
834   conn_.rlimit.startw();
835   ev_set_cb(&conn_.wev, writecb);
836
837   return 0;
838 }
839
840 void HttpDownstreamConnection::on_upstream_change(Upstream *upstream) {}
841
842 void HttpDownstreamConnection::signal_write() { conn_.wlimit.startw(); }
843
844 } // namespace shrpx