tizen 2.4 release
[external/nghttp2.git] / src / shrpx_http_downstream_connection.cc
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http_downstream_connection.h"
26
27 #include "shrpx_client_handler.h"
28 #include "shrpx_upstream.h"
29 #include "shrpx_downstream.h"
30 #include "shrpx_config.h"
31 #include "shrpx_error.h"
32 #include "shrpx_http.h"
33 #include "shrpx_worker_config.h"
34 #include "shrpx_connect_blocker.h"
35 #include "shrpx_downstream_connection_pool.h"
36 #include "shrpx_worker.h"
37 #include "http2.h"
38 #include "util.h"
39
40 using namespace nghttp2;
41
42 namespace shrpx {
43
44 namespace {
45 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
46   auto conn = static_cast<Connection *>(w->data);
47   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
48
49   if (LOG_ENABLED(INFO)) {
50     DCLOG(INFO, dconn) << "Time out";
51   }
52
53   auto downstream = dconn->get_downstream();
54   auto upstream = downstream->get_upstream();
55   auto handler = upstream->get_client_handler();
56
57   // Do this so that dconn is not pooled
58   downstream->set_response_connection_close(true);
59
60   if (upstream->downstream_error(dconn, Downstream::EVENT_TIMEOUT) != 0) {
61     delete handler;
62   }
63 }
64 } // namespace
65
66 namespace {
67 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
68   auto conn = static_cast<Connection *>(w->data);
69   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
70   auto downstream = dconn->get_downstream();
71   auto upstream = downstream->get_upstream();
72   auto handler = upstream->get_client_handler();
73
74   if (upstream->downstream_read(dconn) != 0) {
75     delete handler;
76   }
77 }
78 } // namespace
79
80 namespace {
81 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
82   auto conn = static_cast<Connection *>(w->data);
83   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
84   auto downstream = dconn->get_downstream();
85   auto upstream = downstream->get_upstream();
86   auto handler = upstream->get_client_handler();
87
88   if (upstream->downstream_write(dconn) != 0) {
89     delete handler;
90   }
91 }
92 } // namespace
93
94 namespace {
95 void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
96   auto conn = static_cast<Connection *>(w->data);
97   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
98   auto downstream = dconn->get_downstream();
99   auto upstream = downstream->get_upstream();
100   auto handler = upstream->get_client_handler();
101   if (dconn->on_connect() != 0) {
102     if (upstream->on_downstream_abort_request(downstream, 503) != 0) {
103       delete handler;
104     }
105     return;
106   }
107   writecb(loop, w, revents);
108 }
109 } // namespace
110
111 HttpDownstreamConnection::HttpDownstreamConnection(
112     DownstreamConnectionPool *dconn_pool, struct ev_loop *loop)
113     : DownstreamConnection(dconn_pool),
114       conn_(loop, -1, nullptr, get_config()->downstream_write_timeout,
115             get_config()->downstream_read_timeout, 0, 0, 0, 0, connectcb,
116             readcb, timeoutcb, this),
117       ioctrl_(&conn_.rlimit), response_htp_{0}, addr_idx_(0) {}
118
119 HttpDownstreamConnection::~HttpDownstreamConnection() {
120   // Downstream and DownstreamConnection may be deleted
121   // asynchronously.
122   if (downstream_) {
123     downstream_->release_downstream_connection();
124   }
125 }
126
127 int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
128   if (LOG_ENABLED(INFO)) {
129     DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream;
130   }
131
132   if (conn_.fd == -1) {
133     auto connect_blocker = client_handler_->get_http1_connect_blocker();
134
135     if (connect_blocker->blocked()) {
136       if (LOG_ENABLED(INFO)) {
137         DCLOG(INFO, this)
138             << "Downstream connection was blocked by connect_blocker";
139       }
140       return -1;
141     }
142
143     auto worker_stat = client_handler_->get_worker_stat();
144     auto end = worker_stat->next_downstream;
145     for (;;) {
146       auto i = worker_stat->next_downstream;
147       ++worker_stat->next_downstream;
148       worker_stat->next_downstream %= get_config()->downstream_addrs.size();
149
150       conn_.fd = util::create_nonblock_socket(
151           get_config()->downstream_addrs[i].addr.storage.ss_family);
152
153       if (conn_.fd == -1) {
154         auto error = errno;
155         DCLOG(WARN, this) << "socket() failed; errno=" << error;
156
157         connect_blocker->on_failure();
158
159         return SHRPX_ERR_NETWORK;
160       }
161
162       int rv;
163       rv = connect(conn_.fd, const_cast<sockaddr *>(
164                                  &get_config()->downstream_addrs[i].addr.sa),
165                    get_config()->downstream_addrs[i].addrlen);
166       if (rv != 0 && errno != EINPROGRESS) {
167         auto error = errno;
168         DCLOG(WARN, this) << "connect() failed; errno=" << error;
169
170         connect_blocker->on_failure();
171         close(conn_.fd);
172         conn_.fd = -1;
173
174         if (end == worker_stat->next_downstream) {
175           return SHRPX_ERR_NETWORK;
176         }
177
178         // Try again with the next downstream server
179         continue;
180       }
181
182       if (LOG_ENABLED(INFO)) {
183         DCLOG(INFO, this) << "Connecting to downstream server";
184       }
185
186       addr_idx_ = i;
187
188       ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
189       ev_io_set(&conn_.rev, conn_.fd, EV_READ);
190
191       conn_.wlimit.startw();
192
193       break;
194     }
195   }
196
197   downstream_ = downstream;
198
199   http_parser_init(&response_htp_, HTTP_RESPONSE);
200   response_htp_.data = downstream_;
201
202   ev_set_cb(&conn_.rev, readcb);
203
204   conn_.rt.repeat = get_config()->downstream_read_timeout;
205   ev_timer_again(conn_.loop, &conn_.rt);
206   // TODO we should have timeout for connection establishment
207   ev_timer_again(conn_.loop, &conn_.wt);
208
209   return 0;
210 }
211
212 int HttpDownstreamConnection::push_request_headers() {
213   const char *authority = nullptr, *host = nullptr;
214   if (!get_config()->no_host_rewrite && !get_config()->http2_proxy &&
215       !get_config()->client_proxy) {
216     if (!downstream_->get_request_http2_authority().empty()) {
217       authority = get_config()->downstream_addrs[addr_idx_].hostport.get();
218     }
219     if (downstream_->get_request_header(http2::HD_HOST)) {
220       host = get_config()->downstream_addrs[addr_idx_].hostport.get();
221     }
222   } else {
223     if (!downstream_->get_request_http2_authority().empty()) {
224       authority = downstream_->get_request_http2_authority().c_str();
225     }
226     auto h = downstream_->get_request_header(http2::HD_HOST);
227     if (h) {
228       host = h->value.c_str();
229     }
230   }
231
232   if (!authority && !host) {
233     // upstream is HTTP/1.0.  We use backend server's host
234     // nonetheless.
235     host = get_config()->downstream_addrs[addr_idx_].hostport.get();
236   }
237
238   if (authority) {
239     downstream_->set_request_downstream_host(authority);
240   } else {
241     downstream_->set_request_downstream_host(host);
242   }
243
244   downstream_->assemble_request_cookie();
245
246   // Assume that method and request path do not contain \r\n.
247   std::string hdrs = downstream_->get_request_method();
248   hdrs += " ";
249   if (downstream_->get_request_method() == "CONNECT") {
250     if (authority) {
251       hdrs += authority;
252     } else {
253       hdrs += downstream_->get_request_path();
254     }
255   } else if (get_config()->http2_proxy &&
256              !downstream_->get_request_http2_scheme().empty() && authority &&
257              (downstream_->get_request_path().c_str()[0] == '/' ||
258               downstream_->get_request_path() == "*")) {
259     // Construct absolute-form request target because we are going to
260     // send a request to a HTTP/1 proxy.
261     hdrs += downstream_->get_request_http2_scheme();
262     hdrs += "://";
263     hdrs += authority;
264
265     // Server-wide OPTIONS takes following form in proxy request:
266     //
267     // OPTIONS http://example.org HTTP/1.1
268     //
269     // Notice that no slash after authority. See
270     // http://tools.ietf.org/html/rfc7230#section-5.3.4
271     if (downstream_->get_request_path() != "*") {
272       hdrs += downstream_->get_request_path();
273     }
274   } else {
275     // No proxy case. get_request_path() may be absolute-form but we
276     // don't care.
277     hdrs += downstream_->get_request_path();
278   }
279   hdrs += " HTTP/1.1\r\nHost: ";
280   if (authority) {
281     hdrs += authority;
282   } else {
283     hdrs += host;
284   }
285   hdrs += "\r\n";
286
287   http2::build_http1_headers_from_headers(hdrs,
288                                           downstream_->get_request_headers());
289
290   if (!downstream_->get_assembled_request_cookie().empty()) {
291     hdrs += "Cookie: ";
292     hdrs += downstream_->get_assembled_request_cookie();
293     hdrs += "\r\n";
294   }
295
296   if (downstream_->get_request_method() != "CONNECT" &&
297       downstream_->get_request_http2_expect_body() &&
298       !downstream_->get_request_header(http2::HD_CONTENT_LENGTH)) {
299
300     downstream_->set_chunked_request(true);
301     hdrs += "Transfer-Encoding: chunked\r\n";
302   }
303
304   if (downstream_->get_request_connection_close()) {
305     hdrs += "Connection: close\r\n";
306   }
307   auto xff = downstream_->get_request_header(http2::HD_X_FORWARDED_FOR);
308   if (get_config()->add_x_forwarded_for) {
309     hdrs += "X-Forwarded-For: ";
310     if (xff && !get_config()->strip_incoming_x_forwarded_for) {
311       hdrs += (*xff).value;
312       hdrs += ", ";
313     }
314     hdrs += client_handler_->get_ipaddr();
315     hdrs += "\r\n";
316   } else if (xff && !get_config()->strip_incoming_x_forwarded_for) {
317     hdrs += "X-Forwarded-For: ";
318     hdrs += (*xff).value;
319     hdrs += "\r\n";
320   }
321   if (!get_config()->http2_proxy && !get_config()->client_proxy &&
322       downstream_->get_request_method() != "CONNECT") {
323     hdrs += "X-Forwarded-Proto: ";
324     if (!downstream_->get_request_http2_scheme().empty()) {
325       hdrs += downstream_->get_request_http2_scheme();
326       hdrs += "\r\n";
327     } else if (client_handler_->get_ssl()) {
328       hdrs += "https\r\n";
329     } else {
330       hdrs += "http\r\n";
331     }
332   }
333   auto expect = downstream_->get_request_header(http2::HD_EXPECT);
334   if (expect && !util::strifind((*expect).value.c_str(), "100-continue")) {
335     hdrs += "Expect: ";
336     hdrs += (*expect).value;
337     hdrs += "\r\n";
338   }
339   auto via = downstream_->get_request_header(http2::HD_VIA);
340   if (get_config()->no_via) {
341     if (via) {
342       hdrs += "Via: ";
343       hdrs += (*via).value;
344       hdrs += "\r\n";
345     }
346   } else {
347     hdrs += "Via: ";
348     if (via) {
349       hdrs += (*via).value;
350       hdrs += ", ";
351     }
352     hdrs += http::create_via_header_value(downstream_->get_request_major(),
353                                           downstream_->get_request_minor());
354     hdrs += "\r\n";
355   }
356
357   hdrs += "\r\n";
358   if (LOG_ENABLED(INFO)) {
359     const char *hdrp;
360     std::string nhdrs;
361     if (worker_config->errorlog_tty) {
362       nhdrs = http::colorizeHeaders(hdrs.c_str());
363       hdrp = nhdrs.c_str();
364     } else {
365       hdrp = hdrs.c_str();
366     }
367     DCLOG(INFO, this) << "HTTP request headers. stream_id="
368                       << downstream_->get_stream_id() << "\n" << hdrp;
369   }
370   auto output = downstream_->get_request_buf();
371   output->append(hdrs.c_str(), hdrs.size());
372
373   signal_write();
374
375   return 0;
376 }
377
378 int HttpDownstreamConnection::push_upload_data_chunk(const uint8_t *data,
379                                                      size_t datalen) {
380   auto chunked = downstream_->get_chunked_request();
381   auto output = downstream_->get_request_buf();
382
383   if (chunked) {
384     auto chunk_size_hex = util::utox(datalen);
385     output->append(chunk_size_hex.c_str(), chunk_size_hex.size());
386     output->append("\r\n");
387   }
388
389   output->append(data, datalen);
390
391   if (chunked) {
392     output->append("\r\n");
393   }
394
395   signal_write();
396
397   return 0;
398 }
399
400 int HttpDownstreamConnection::end_upload_data() {
401   if (!downstream_->get_chunked_request()) {
402     return 0;
403   }
404
405   auto output = downstream_->get_request_buf();
406   output->append("0\r\n\r\n");
407
408   signal_write();
409
410   return 0;
411 }
412
413 namespace {
414 void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) {
415   auto conn = static_cast<Connection *>(w->data);
416   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
417   if (LOG_ENABLED(INFO)) {
418     DCLOG(INFO, dconn) << "Idle connection EOF";
419   }
420   auto dconn_pool = dconn->get_dconn_pool();
421   dconn_pool->remove_downstream_connection(dconn);
422   // dconn was deleted
423 }
424 } // namespace
425
426 namespace {
427 void idle_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
428   auto conn = static_cast<Connection *>(w->data);
429   auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
430   if (LOG_ENABLED(INFO)) {
431     DCLOG(INFO, dconn) << "Idle connection timeout";
432   }
433   auto dconn_pool = dconn->get_dconn_pool();
434   dconn_pool->remove_downstream_connection(dconn);
435   // dconn was deleted
436 }
437 } // namespace
438
439 void HttpDownstreamConnection::detach_downstream(Downstream *downstream) {
440   if (LOG_ENABLED(INFO)) {
441     DCLOG(INFO, this) << "Detaching from DOWNSTREAM:" << downstream;
442   }
443   downstream_ = nullptr;
444   ioctrl_.force_resume_read();
445
446   conn_.rlimit.startw();
447   conn_.wlimit.stopw();
448
449   ev_set_cb(&conn_.rev, idle_readcb);
450
451   ev_timer_stop(conn_.loop, &conn_.wt);
452
453   conn_.rt.repeat = get_config()->downstream_idle_read_timeout;
454   ev_set_cb(&conn_.rt, idle_timeoutcb);
455   ev_timer_again(conn_.loop, &conn_.rt);
456 }
457
458 void HttpDownstreamConnection::pause_read(IOCtrlReason reason) {
459   ioctrl_.pause_read(reason);
460 }
461
462 int HttpDownstreamConnection::resume_read(IOCtrlReason reason,
463                                           size_t consumed) {
464   if (!downstream_->response_buf_full()) {
465     ioctrl_.resume_read(reason);
466   }
467
468   return 0;
469 }
470
471 void HttpDownstreamConnection::force_resume_read() {
472   ioctrl_.force_resume_read();
473 }
474
475 namespace {
476 int htp_msg_begincb(http_parser *htp) {
477   auto downstream = static_cast<Downstream *>(htp->data);
478
479   if (downstream->get_response_state() != Downstream::INITIAL) {
480     return -1;
481   }
482
483   return 0;
484 }
485 } // namespace
486
487 namespace {
488 int htp_hdrs_completecb(http_parser *htp) {
489   auto downstream = static_cast<Downstream *>(htp->data);
490   auto upstream = downstream->get_upstream();
491   int rv;
492
493   downstream->set_response_http_status(htp->status_code);
494   downstream->set_response_major(htp->http_major);
495   downstream->set_response_minor(htp->http_minor);
496
497   if (downstream->index_response_headers() != 0) {
498     downstream->set_response_state(Downstream::MSG_BAD_HEADER);
499     return -1;
500   }
501
502   if (downstream->get_non_final_response()) {
503     // For non-final response code, we just call
504     // on_downstream_header_complete() without changing response
505     // state.
506     rv = upstream->on_downstream_header_complete(downstream);
507
508     if (rv != 0) {
509       return -1;
510     }
511
512     return 0;
513   }
514
515   downstream->set_response_connection_close(!http_should_keep_alive(htp));
516   downstream->set_response_state(Downstream::HEADER_COMPLETE);
517   downstream->inspect_http1_response();
518   downstream->check_upgrade_fulfilled();
519   if (downstream->get_upgraded()) {
520     downstream->set_response_connection_close(true);
521   }
522   if (upstream->on_downstream_header_complete(downstream) != 0) {
523     return -1;
524   }
525
526   if (downstream->get_upgraded()) {
527     // Upgrade complete, read until EOF in both ends
528     if (upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) {
529       return -1;
530     }
531     downstream->set_request_state(Downstream::HEADER_COMPLETE);
532     if (LOG_ENABLED(INFO)) {
533       LOG(INFO) << "HTTP upgrade success. stream_id="
534                 << downstream->get_stream_id();
535     }
536   }
537
538   unsigned int status = downstream->get_response_http_status();
539   // Ignore the response body. HEAD response may contain
540   // Content-Length or Transfer-Encoding: chunked.  Some server send
541   // 304 status code with nonzero Content-Length, but without response
542   // body. See
543   // http://tools.ietf.org/html/draft-ietf-httpbis-p1-messaging-20#section-3.3
544
545   // TODO It seems that the cases other than HEAD are handled by
546   // http-parser.  Need test.
547   return downstream->get_request_method() == "HEAD" ||
548                  (100 <= status && status <= 199) || status == 204 ||
549                  status == 304
550              ? 1
551              : 0;
552 }
553 } // namespace
554
555 namespace {
556 int htp_hdr_keycb(http_parser *htp, const char *data, size_t len) {
557   auto downstream = static_cast<Downstream *>(htp->data);
558   if (downstream->get_response_state() != Downstream::INITIAL) {
559     // ignore trailers
560     return 0;
561   }
562   if (downstream->get_response_header_key_prev()) {
563     downstream->append_last_response_header_key(data, len);
564   } else {
565     downstream->add_response_header(std::string(data, len), "");
566   }
567   if (downstream->get_response_headers_sum() > Downstream::MAX_HEADERS_SUM) {
568     if (LOG_ENABLED(INFO)) {
569       DLOG(INFO, downstream) << "Too large header block size="
570                              << downstream->get_response_headers_sum();
571     }
572     return -1;
573   }
574   return 0;
575 }
576 } // namespace
577
578 namespace {
579 int htp_hdr_valcb(http_parser *htp, const char *data, size_t len) {
580   auto downstream = static_cast<Downstream *>(htp->data);
581   if (downstream->get_response_state() != Downstream::INITIAL) {
582     // ignore trailers
583     return 0;
584   }
585   if (downstream->get_response_header_key_prev()) {
586     downstream->set_last_response_header_value(std::string(data, len));
587   } else {
588     downstream->append_last_response_header_value(data, len);
589   }
590   if (downstream->get_response_headers_sum() > Downstream::MAX_HEADERS_SUM) {
591     if (LOG_ENABLED(INFO)) {
592       DLOG(INFO, downstream) << "Too large header block size="
593                              << downstream->get_response_headers_sum();
594     }
595     return -1;
596   }
597   return 0;
598 }
599 } // namespace
600
601 namespace {
602 int htp_bodycb(http_parser *htp, const char *data, size_t len) {
603   auto downstream = static_cast<Downstream *>(htp->data);
604
605   downstream->add_response_bodylen(len);
606
607   return downstream->get_upstream()->on_downstream_body(
608       downstream, reinterpret_cast<const uint8_t *>(data), len, true);
609 }
610 } // namespace
611
612 namespace {
613 int htp_msg_completecb(http_parser *htp) {
614   auto downstream = static_cast<Downstream *>(htp->data);
615
616   if (downstream->get_non_final_response()) {
617     downstream->reset_response();
618
619     return 0;
620   }
621
622   downstream->set_response_state(Downstream::MSG_COMPLETE);
623   // Block reading another response message from (broken?)
624   // server. This callback is not called if the connection is
625   // tunneled.
626   downstream->pause_read(SHRPX_MSG_BLOCK);
627   return downstream->get_upstream()->on_downstream_body_complete(downstream);
628 }
629 } // namespace
630
631 namespace {
632 http_parser_settings htp_hooks = {
633     htp_msg_begincb,     // http_cb on_message_begin;
634     nullptr,             // http_data_cb on_url;
635     nullptr,             // http_data_cb on_status;
636     htp_hdr_keycb,       // http_data_cb on_header_field;
637     htp_hdr_valcb,       // http_data_cb on_header_value;
638     htp_hdrs_completecb, // http_cb      on_headers_complete;
639     htp_bodycb,          // http_data_cb on_body;
640     htp_msg_completecb   // http_cb      on_message_complete;
641 };
642 } // namespace
643
644 int HttpDownstreamConnection::on_read() {
645   ev_timer_again(conn_.loop, &conn_.rt);
646   std::array<uint8_t, 8192> buf;
647   int rv;
648
649   if (downstream_->get_upgraded()) {
650     // For upgraded connection, just pass data to the upstream.
651     for (;;) {
652       auto nread = conn_.read_clear(buf.data(), buf.size());
653
654       if (nread == 0) {
655         return 0;
656       }
657
658       if (nread < 0) {
659         return nread;
660       }
661
662       rv = downstream_->get_upstream()->on_downstream_body(
663           downstream_, buf.data(), nread, true);
664       if (rv != 0) {
665         return rv;
666       }
667
668       if (downstream_->response_buf_full()) {
669         downstream_->pause_read(SHRPX_NO_BUFFER);
670         return 0;
671       }
672     }
673   }
674
675   for (;;) {
676     auto nread = conn_.read_clear(buf.data(), buf.size());
677
678     if (nread == 0) {
679       return 0;
680     }
681
682     if (nread < 0) {
683       return nread;
684     }
685
686     auto nproc =
687         http_parser_execute(&response_htp_, &htp_hooks,
688                             reinterpret_cast<char *>(buf.data()), nread);
689
690     if (nproc != static_cast<size_t>(nread)) {
691       if (LOG_ENABLED(INFO)) {
692         DCLOG(INFO, this) << "nproc != nread";
693       }
694       return -1;
695     }
696
697     auto htperr = HTTP_PARSER_ERRNO(&response_htp_);
698
699     if (htperr != HPE_OK) {
700       if (LOG_ENABLED(INFO)) {
701         DCLOG(INFO, this) << "HTTP parser failure: "
702                           << "(" << http_errno_name(htperr) << ") "
703                           << http_errno_description(htperr);
704       }
705
706       return -1;
707     }
708
709     if (downstream_->response_buf_full()) {
710       downstream_->pause_read(SHRPX_NO_BUFFER);
711       return 0;
712     }
713   }
714 }
715
716 int HttpDownstreamConnection::on_write() {
717   ev_timer_again(conn_.loop, &conn_.rt);
718
719   auto upstream = downstream_->get_upstream();
720   auto input = downstream_->get_request_buf();
721
722   std::array<struct iovec, MAX_WR_IOVCNT> iov;
723
724   while (input->rleft() > 0) {
725     auto iovcnt = input->riovec(iov.data(), iov.size());
726
727     auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
728
729     if (nwrite == 0) {
730       return 0;
731     }
732
733     if (nwrite < 0) {
734       return nwrite;
735     }
736
737     input->drain(nwrite);
738   }
739
740   conn_.wlimit.stopw();
741   ev_timer_stop(conn_.loop, &conn_.wt);
742
743   if (input->rleft() == 0) {
744     upstream->resume_read(SHRPX_NO_BUFFER, downstream_,
745                           downstream_->get_request_datalen());
746   }
747
748   return 0;
749 }
750
751 int HttpDownstreamConnection::on_connect() {
752   auto connect_blocker = client_handler_->get_http1_connect_blocker();
753
754   if (!util::check_socket_connected(conn_.fd)) {
755     conn_.wlimit.stopw();
756
757     if (LOG_ENABLED(INFO)) {
758       DLOG(INFO, this) << "downstream connect failed";
759     }
760     connect_blocker->on_failure();
761     return -1;
762   }
763
764   connect_blocker->on_success();
765
766   conn_.rlimit.startw();
767   ev_set_cb(&conn_.wev, writecb);
768
769   return 0;
770 }
771
772 void HttpDownstreamConnection::on_upstream_change(Upstream *upstream) {}
773
774 void HttpDownstreamConnection::signal_write() { conn_.wlimit.startw(); }
775
776 } // namespace shrpx