tizen 2.4 release
[external/nghttp2.git] / src / shrpx_spdy_upstream.cc
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_spdy_upstream.h"
26
27 #include <netinet/tcp.h>
28 #include <assert.h>
29 #include <cerrno>
30 #include <sstream>
31
32 #include <nghttp2/nghttp2.h>
33
34 #include "shrpx_client_handler.h"
35 #include "shrpx_downstream.h"
36 #include "shrpx_downstream_connection.h"
37 #include "shrpx_config.h"
38 #include "shrpx_http.h"
39 #include "shrpx_worker_config.h"
40 #include "http2.h"
41 #include "util.h"
42 #include "template.h"
43
44 using namespace nghttp2;
45
46 namespace shrpx {
47
48 namespace {
49 ssize_t send_callback(spdylay_session *session, const uint8_t *data, size_t len,
50                       int flags, void *user_data) {
51   auto upstream = static_cast<SpdyUpstream *>(user_data);
52   auto handler = upstream->get_client_handler();
53   auto wb = handler->get_wb();
54
55   if (wb->wleft() == 0) {
56     return SPDYLAY_ERR_WOULDBLOCK;
57   }
58
59   auto nread = wb->write(data, len);
60
61   return nread;
62 }
63 } // namespace
64
65 namespace {
66 ssize_t recv_callback(spdylay_session *session, uint8_t *buf, size_t len,
67                       int flags, void *user_data) {
68   auto upstream = static_cast<SpdyUpstream *>(user_data);
69   auto handler = upstream->get_client_handler();
70   auto rb = handler->get_rb();
71
72   if (rb->rleft() == 0) {
73     return SPDYLAY_ERR_WOULDBLOCK;
74   }
75
76   auto nread = std::min(rb->rleft(), len);
77
78   memcpy(buf, rb->pos, nread);
79   rb->drain(nread);
80
81   return nread;
82 }
83 } // namespace
84
85 namespace {
86 void on_stream_close_callback(spdylay_session *session, int32_t stream_id,
87                               spdylay_status_code status_code,
88                               void *user_data) {
89   auto upstream = static_cast<SpdyUpstream *>(user_data);
90   if (LOG_ENABLED(INFO)) {
91     ULOG(INFO, upstream) << "Stream stream_id=" << stream_id
92                          << " is being closed";
93   }
94   auto downstream = upstream->find_downstream(stream_id);
95   if (!downstream) {
96     return;
97   }
98
99   upstream->consume(stream_id, downstream->get_request_datalen());
100
101   downstream->reset_request_datalen();
102
103   if (downstream->get_request_state() == Downstream::CONNECT_FAIL) {
104     upstream->remove_downstream(downstream);
105     // downstrea was deleted
106
107     return;
108   }
109
110   downstream->set_request_state(Downstream::STREAM_CLOSED);
111   if (downstream->get_response_state() == Downstream::MSG_COMPLETE) {
112     // At this point, downstream response was read
113     if (!downstream->get_upgraded() &&
114         !downstream->get_response_connection_close()) {
115       // Keep-alive
116       downstream->detach_downstream_connection();
117     }
118     upstream->remove_downstream(downstream);
119     // downstrea was deleted
120
121     return;
122   }
123
124   // At this point, downstream read may be paused.
125
126   // If shrpx_downstream::push_request_headers() failed, the
127   // error is handled here.
128   upstream->remove_downstream(downstream);
129   // downstrea was deleted
130
131   // How to test this case? Request sufficient large download
132   // and make client send RST_STREAM after it gets first DATA
133   // frame chunk.
134 }
135 } // namespace
136
137 namespace {
138 void on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type type,
139                            spdylay_frame *frame, void *user_data) {
140   auto upstream = static_cast<SpdyUpstream *>(user_data);
141   switch (type) {
142   case SPDYLAY_SYN_STREAM: {
143     if (LOG_ENABLED(INFO)) {
144       ULOG(INFO, upstream) << "Received upstream SYN_STREAM stream_id="
145                            << frame->syn_stream.stream_id;
146     }
147
148     auto downstream = upstream->add_pending_downstream(
149         frame->syn_stream.stream_id, frame->syn_stream.pri);
150
151     downstream->reset_upstream_rtimer();
152
153     auto nv = frame->syn_stream.nv;
154
155     if (LOG_ENABLED(INFO)) {
156       std::stringstream ss;
157       for (size_t i = 0; nv[i]; i += 2) {
158         ss << TTY_HTTP_HD << nv[i] << TTY_RST << ": " << nv[i + 1] << "\n";
159       }
160       ULOG(INFO, upstream) << "HTTP request headers. stream_id="
161                            << downstream->get_stream_id() << "\n" << ss.str();
162     }
163
164     for (size_t i = 0; nv[i]; i += 2) {
165       downstream->add_request_header(nv[i], nv[i + 1]);
166     }
167
168     if (downstream->index_request_headers() != 0) {
169       if (upstream->error_reply(downstream, 400) != 0) {
170         ULOG(FATAL, upstream) << "error_reply failed";
171       }
172       return;
173     }
174
175     auto path = downstream->get_request_header(http2::HD__PATH);
176     auto scheme = downstream->get_request_header(http2::HD__SCHEME);
177     auto host = downstream->get_request_header(http2::HD__HOST);
178     auto method = downstream->get_request_header(http2::HD__METHOD);
179
180     bool is_connect = method && "CONNECT" == method->value;
181     if (!path || !host || !method || !http2::non_empty_value(host) ||
182         !http2::non_empty_value(path) || !http2::non_empty_value(method) ||
183         (!is_connect && (!scheme || !http2::non_empty_value(scheme)))) {
184       upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
185       return;
186     }
187
188     downstream->set_request_method(method->value);
189     if (is_connect) {
190       downstream->set_request_http2_authority(path->value);
191     } else {
192       downstream->set_request_http2_scheme(scheme->value);
193       downstream->set_request_http2_authority(host->value);
194       downstream->set_request_path(path->value);
195     }
196
197     if (!(frame->syn_stream.hd.flags & SPDYLAY_CTRL_FLAG_FIN)) {
198       downstream->set_request_http2_expect_body(true);
199     }
200
201     downstream->inspect_http2_request();
202
203     downstream->set_request_state(Downstream::HEADER_COMPLETE);
204     if (frame->syn_stream.hd.flags & SPDYLAY_CTRL_FLAG_FIN) {
205       if (!downstream->validate_request_bodylen()) {
206         upstream->rst_stream(downstream, SPDYLAY_PROTOCOL_ERROR);
207         return;
208       }
209
210       downstream->disable_upstream_rtimer();
211       downstream->set_request_state(Downstream::MSG_COMPLETE);
212     }
213
214     upstream->start_downstream(downstream);
215
216     break;
217   }
218   default:
219     break;
220   }
221 }
222 } // namespace
223
224 void SpdyUpstream::start_downstream(Downstream *downstream) {
225   auto next_downstream =
226       downstream_queue_.pop_pending(downstream->get_stream_id());
227   assert(next_downstream);
228
229   if (downstream_queue_.can_activate(
230           downstream->get_request_http2_authority())) {
231     initiate_downstream(std::move(next_downstream));
232     return;
233   }
234
235   downstream_queue_.add_blocked(std::move(next_downstream));
236 }
237
238 void SpdyUpstream::initiate_downstream(std::unique_ptr<Downstream> downstream) {
239   int rv = downstream->attach_downstream_connection(
240       handler_->get_downstream_connection());
241   if (rv != 0) {
242     // If downstream connection fails, issue RST_STREAM.
243     rst_stream(downstream.get(), SPDYLAY_INTERNAL_ERROR);
244     downstream->set_request_state(Downstream::CONNECT_FAIL);
245
246     downstream_queue_.add_failure(std::move(downstream));
247
248     return;
249   }
250   rv = downstream->push_request_headers();
251   if (rv != 0) {
252     rst_stream(downstream.get(), SPDYLAY_INTERNAL_ERROR);
253
254     downstream_queue_.add_failure(std::move(downstream));
255
256     return;
257   }
258
259   downstream_queue_.add_active(std::move(downstream));
260 }
261
262 namespace {
263 void on_data_chunk_recv_callback(spdylay_session *session, uint8_t flags,
264                                  int32_t stream_id, const uint8_t *data,
265                                  size_t len, void *user_data) {
266   auto upstream = static_cast<SpdyUpstream *>(user_data);
267   auto downstream = upstream->find_downstream(stream_id);
268
269   if (!downstream) {
270     upstream->consume(stream_id, len);
271
272     return;
273   }
274
275   downstream->reset_upstream_rtimer();
276
277   if (downstream->push_upload_data_chunk(data, len) != 0) {
278     upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
279
280     upstream->consume(stream_id, len);
281
282     return;
283   }
284
285   if (!upstream->get_flow_control()) {
286     return;
287   }
288
289   // If connection-level window control is not enabled (e.g,
290   // spdy/3), spdylay_session_get_recv_data_length() is always
291   // returns 0.
292   if (spdylay_session_get_recv_data_length(session) >
293       std::max(SPDYLAY_INITIAL_WINDOW_SIZE,
294                1 << get_config()->http2_upstream_connection_window_bits)) {
295     if (LOG_ENABLED(INFO)) {
296       ULOG(INFO, upstream)
297           << "Flow control error on connection: "
298           << "recv_window_size="
299           << spdylay_session_get_recv_data_length(session) << ", window_size="
300           << (1 << get_config()->http2_upstream_connection_window_bits);
301     }
302     spdylay_session_fail_session(session, SPDYLAY_GOAWAY_PROTOCOL_ERROR);
303     return;
304   }
305   if (spdylay_session_get_stream_recv_data_length(session, stream_id) >
306       std::max(SPDYLAY_INITIAL_WINDOW_SIZE,
307                1 << get_config()->http2_upstream_window_bits)) {
308     if (LOG_ENABLED(INFO)) {
309       ULOG(INFO, upstream) << "Flow control error: recv_window_size="
310                            << spdylay_session_get_stream_recv_data_length(
311                                   session, stream_id)
312                            << ", initial_window_size="
313                            << (1 << get_config()->http2_upstream_window_bits);
314     }
315     upstream->rst_stream(downstream, SPDYLAY_FLOW_CONTROL_ERROR);
316     return;
317   }
318 }
319 } // namespace
320
321 namespace {
322 void on_data_recv_callback(spdylay_session *session, uint8_t flags,
323                            int32_t stream_id, int32_t length, void *user_data) {
324   auto upstream = static_cast<SpdyUpstream *>(user_data);
325   auto downstream = upstream->find_downstream(stream_id);
326   if (downstream && (flags & SPDYLAY_DATA_FLAG_FIN)) {
327     if (!downstream->validate_request_bodylen()) {
328       upstream->rst_stream(downstream, SPDYLAY_PROTOCOL_ERROR);
329       return;
330     }
331
332     downstream->disable_upstream_rtimer();
333     downstream->end_upload_data();
334     downstream->set_request_state(Downstream::MSG_COMPLETE);
335   }
336 }
337 } // namespace
338
339 namespace {
340 void on_ctrl_not_send_callback(spdylay_session *session,
341                                spdylay_frame_type type, spdylay_frame *frame,
342                                int error_code, void *user_data) {
343   auto upstream = static_cast<SpdyUpstream *>(user_data);
344   if (LOG_ENABLED(INFO)) {
345     ULOG(INFO, upstream) << "Failed to send control frame type=" << type
346                          << ", error_code=" << error_code << ":"
347                          << spdylay_strerror(error_code);
348   }
349   if (type == SPDYLAY_SYN_REPLY && error_code != SPDYLAY_ERR_STREAM_CLOSED &&
350       error_code != SPDYLAY_ERR_STREAM_CLOSING) {
351     // To avoid stream hanging around, issue RST_STREAM.
352     auto stream_id = frame->syn_reply.stream_id;
353     auto downstream = upstream->find_downstream(stream_id);
354     if (downstream) {
355       upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
356     }
357   }
358 }
359 } // namespace
360
361 namespace {
362 void on_ctrl_recv_parse_error_callback(spdylay_session *session,
363                                        spdylay_frame_type type,
364                                        const uint8_t *head, size_t headlen,
365                                        const uint8_t *payload,
366                                        size_t payloadlen, int error_code,
367                                        void *user_data) {
368   auto upstream = static_cast<SpdyUpstream *>(user_data);
369   if (LOG_ENABLED(INFO)) {
370     ULOG(INFO, upstream) << "Failed to parse received control frame. type="
371                          << type << ", error_code=" << error_code << ":"
372                          << spdylay_strerror(error_code);
373   }
374 }
375 } // namespace
376
377 namespace {
378 void on_unknown_ctrl_recv_callback(spdylay_session *session,
379                                    const uint8_t *head, size_t headlen,
380                                    const uint8_t *payload, size_t payloadlen,
381                                    void *user_data) {
382   auto upstream = static_cast<SpdyUpstream *>(user_data);
383   if (LOG_ENABLED(INFO)) {
384     ULOG(INFO, upstream) << "Received unknown control frame.";
385   }
386 }
387 } // namespace
388
389 namespace {
390 // Infer upstream RST_STREAM status code from downstream HTTP/2
391 // error code.
392 uint32_t infer_upstream_rst_stream_status_code(uint32_t downstream_error_code) {
393   // Only propagate *_REFUSED_STREAM so that upstream client can
394   // resend request.
395   if (downstream_error_code == NGHTTP2_REFUSED_STREAM) {
396     return SPDYLAY_REFUSED_STREAM;
397   } else {
398     return SPDYLAY_INTERNAL_ERROR;
399   }
400 }
401 } // namespace
402
403 SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler)
404     : downstream_queue_(
405           get_config()->http2_proxy
406               ? get_config()->downstream_connections_per_host
407               : get_config()->downstream_proto == PROTO_HTTP
408                     ? get_config()->downstream_connections_per_frontend
409                     : 0,
410           !get_config()->http2_proxy),
411       handler_(handler), session_(nullptr) {
412   spdylay_session_callbacks callbacks;
413   memset(&callbacks, 0, sizeof(callbacks));
414   callbacks.send_callback = send_callback;
415   callbacks.recv_callback = recv_callback;
416   callbacks.on_stream_close_callback = on_stream_close_callback;
417   callbacks.on_ctrl_recv_callback = on_ctrl_recv_callback;
418   callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback;
419   callbacks.on_data_recv_callback = on_data_recv_callback;
420   callbacks.on_ctrl_not_send_callback = on_ctrl_not_send_callback;
421   callbacks.on_ctrl_recv_parse_error_callback =
422       on_ctrl_recv_parse_error_callback;
423   callbacks.on_unknown_ctrl_recv_callback = on_unknown_ctrl_recv_callback;
424
425   int rv;
426   rv = spdylay_session_server_new(&session_, version, &callbacks, this);
427   assert(rv == 0);
428
429   if (version >= SPDYLAY_PROTO_SPDY3) {
430     int val = 1;
431     flow_control_ = true;
432     initial_window_size_ = 1 << get_config()->http2_upstream_window_bits;
433     rv = spdylay_session_set_option(
434         session_, SPDYLAY_OPT_NO_AUTO_WINDOW_UPDATE2, &val, sizeof(val));
435     assert(rv == 0);
436   } else {
437     flow_control_ = false;
438     initial_window_size_ = 0;
439   }
440   // TODO Maybe call from outside?
441   std::array<spdylay_settings_entry, 2> entry;
442   entry[0].settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS;
443   entry[0].value = get_config()->http2_max_concurrent_streams;
444   entry[0].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE;
445
446   entry[1].settings_id = SPDYLAY_SETTINGS_INITIAL_WINDOW_SIZE;
447   entry[1].value = initial_window_size_;
448   entry[1].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE;
449
450   rv = spdylay_submit_settings(session_, SPDYLAY_FLAG_SETTINGS_NONE,
451                                entry.data(), entry.size());
452   assert(rv == 0);
453
454   if (version >= SPDYLAY_PROTO_SPDY3_1 &&
455       get_config()->http2_upstream_connection_window_bits > 16) {
456     int32_t delta = (1 << get_config()->http2_upstream_connection_window_bits) -
457                     SPDYLAY_INITIAL_WINDOW_SIZE;
458     rv = spdylay_submit_window_update(session_, 0, delta);
459     assert(rv == 0);
460   }
461
462   handler_->reset_upstream_read_timeout(
463       get_config()->http2_upstream_read_timeout);
464
465   handler_->signal_write();
466 }
467
468 SpdyUpstream::~SpdyUpstream() { spdylay_session_del(session_); }
469
470 int SpdyUpstream::on_read() {
471   int rv = 0;
472
473   rv = spdylay_session_recv(session_);
474   if (rv < 0) {
475     if (rv != SPDYLAY_ERR_EOF) {
476       ULOG(ERROR, this) << "spdylay_session_recv() returned error: "
477                         << spdylay_strerror(rv);
478     }
479     return rv;
480   }
481
482   handler_->signal_write();
483
484   return 0;
485 }
486
487 // After this function call, downstream may be deleted.
488 int SpdyUpstream::on_write() {
489   int rv = 0;
490
491   rv = spdylay_session_send(session_);
492   if (rv != 0) {
493     ULOG(ERROR, this) << "spdylay_session_send() returned error: "
494                       << spdylay_strerror(rv);
495     return rv;
496   }
497
498   if (spdylay_session_want_read(session_) == 0 &&
499       spdylay_session_want_write(session_) == 0 &&
500       handler_->get_wb()->rleft() == 0) {
501     if (LOG_ENABLED(INFO)) {
502       ULOG(INFO, this) << "No more read/write for this SPDY session";
503     }
504     return -1;
505   }
506   return 0;
507 }
508
509 ClientHandler *SpdyUpstream::get_client_handler() const { return handler_; }
510
511 int SpdyUpstream::downstream_read(DownstreamConnection *dconn) {
512   auto downstream = dconn->get_downstream();
513
514   if (downstream->get_request_state() == Downstream::STREAM_CLOSED) {
515     // If upstream SPDY stream was closed, we just close downstream,
516     // because there is no consumer now. Downstream connection is also
517     // closed in this case.
518     remove_downstream(downstream);
519     // downstrea was deleted
520
521     return 0;
522   }
523
524   if (downstream->get_response_state() == Downstream::MSG_RESET) {
525     // The downstream stream was reset (canceled). In this case,
526     // RST_STREAM to the upstream and delete downstream connection
527     // here. Deleting downstream will be taken place at
528     // on_stream_close_callback.
529     rst_stream(downstream,
530                infer_upstream_rst_stream_status_code(
531                    downstream->get_response_rst_stream_error_code()));
532     downstream->pop_downstream_connection();
533     dconn = nullptr;
534   } else if (downstream->get_response_state() == Downstream::MSG_BAD_HEADER) {
535     if (error_reply(downstream, 502) != 0) {
536       return -1;
537     }
538     downstream->pop_downstream_connection();
539     // dconn was deleted
540     dconn = nullptr;
541   } else {
542     auto rv = downstream->on_read();
543     if (rv == SHRPX_ERR_EOF) {
544       return downstream_eof(dconn);
545     }
546     if (rv != 0) {
547       if (rv != SHRPX_ERR_NETWORK) {
548         if (LOG_ENABLED(INFO)) {
549           DCLOG(INFO, dconn) << "HTTP parser failure";
550         }
551       }
552       return downstream_error(dconn, Downstream::EVENT_ERROR);
553     }
554     // Detach downstream connection early so that it could be reused
555     // without hitting server's request timeout.
556     if (downstream->get_response_state() == Downstream::MSG_COMPLETE &&
557         !downstream->get_response_connection_close()) {
558       // Keep-alive
559       downstream->detach_downstream_connection();
560     }
561   }
562
563   handler_->signal_write();
564   // At this point, downstream may be deleted.
565
566   return 0;
567 }
568
569 int SpdyUpstream::downstream_write(DownstreamConnection *dconn) {
570   int rv;
571   rv = dconn->on_write();
572   if (rv == SHRPX_ERR_NETWORK) {
573     return downstream_error(dconn, Downstream::EVENT_ERROR);
574   }
575   if (rv != 0) {
576     return -1;
577   }
578   return 0;
579 }
580
581 int SpdyUpstream::downstream_eof(DownstreamConnection *dconn) {
582   auto downstream = dconn->get_downstream();
583
584   if (LOG_ENABLED(INFO)) {
585     DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
586   }
587   if (downstream->get_request_state() == Downstream::STREAM_CLOSED) {
588     // If stream was closed already, we don't need to send reply at
589     // the first place. We can delete downstream.
590     remove_downstream(downstream);
591     // downstream was deleted
592
593     return 0;
594   }
595
596   // Delete downstream connection. If we don't delete it here, it will
597   // be pooled in on_stream_close_callback.
598   downstream->pop_downstream_connection();
599   // dconn was deleted
600   dconn = nullptr;
601   // downstream wil be deleted in on_stream_close_callback.
602   if (downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
603     // Server may indicate the end of the request by EOF
604     if (LOG_ENABLED(INFO)) {
605       ULOG(INFO, this) << "Downstream body was ended by EOF";
606     }
607     downstream->set_response_state(Downstream::MSG_COMPLETE);
608
609     // For tunneled connection, MSG_COMPLETE signals
610     // downstream_data_read_callback to send RST_STREAM after pending
611     // response body is sent. This is needed to ensure that RST_STREAM
612     // is sent after all pending data are sent.
613     on_downstream_body_complete(downstream);
614   } else if (downstream->get_response_state() != Downstream::MSG_COMPLETE) {
615     // If stream was not closed, then we set MSG_COMPLETE and let
616     // on_stream_close_callback delete downstream.
617     if (error_reply(downstream, 502) != 0) {
618       return -1;
619     }
620   }
621   handler_->signal_write();
622   // At this point, downstream may be deleted.
623   return 0;
624 }
625
626 int SpdyUpstream::downstream_error(DownstreamConnection *dconn, int events) {
627   auto downstream = dconn->get_downstream();
628
629   if (LOG_ENABLED(INFO)) {
630     if (events & Downstream::EVENT_ERROR) {
631       DCLOG(INFO, dconn) << "Downstream network/general error";
632     } else {
633       DCLOG(INFO, dconn) << "Timeout";
634     }
635     if (downstream->get_upgraded()) {
636       DCLOG(INFO, dconn) << "Note: this is tunnel connection";
637     }
638   }
639
640   if (downstream->get_request_state() == Downstream::STREAM_CLOSED) {
641     remove_downstream(downstream);
642     // downstream was deleted
643
644     return 0;
645   }
646
647   // Delete downstream connection. If we don't delete it here, it will
648   // be pooled in on_stream_close_callback.
649   downstream->pop_downstream_connection();
650   // dconn was deleted
651   dconn = nullptr;
652
653   if (downstream->get_response_state() == Downstream::MSG_COMPLETE) {
654     // For SSL tunneling, we issue RST_STREAM. For other types of
655     // stream, we don't have to do anything since response was
656     // complete.
657     if (downstream->get_upgraded()) {
658       // We want "NO_ERROR" error code but SPDY does not have such
659       // code for RST_STREAM.
660       rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
661     }
662   } else {
663     if (downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
664       if (downstream->get_upgraded()) {
665         on_downstream_body_complete(downstream);
666       } else {
667         rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
668       }
669     } else {
670       unsigned int status;
671       if (events & Downstream::EVENT_TIMEOUT) {
672         status = 504;
673       } else {
674         status = 502;
675       }
676       if (error_reply(downstream, status) != 0) {
677         return -1;
678       }
679     }
680     downstream->set_response_state(Downstream::MSG_COMPLETE);
681   }
682   handler_->signal_write();
683   // At this point, downstream may be deleted.
684   return 0;
685 }
686
687 int SpdyUpstream::rst_stream(Downstream *downstream, int status_code) {
688   if (LOG_ENABLED(INFO)) {
689     ULOG(INFO, this) << "RST_STREAM stream_id=" << downstream->get_stream_id();
690   }
691   int rv;
692   rv = spdylay_submit_rst_stream(session_, downstream->get_stream_id(),
693                                  status_code);
694   if (rv < SPDYLAY_ERR_FATAL) {
695     ULOG(FATAL, this) << "spdylay_submit_rst_stream() failed: "
696                       << spdylay_strerror(rv);
697     DIE();
698   }
699   return 0;
700 }
701
702 namespace {
703 ssize_t spdy_data_read_callback(spdylay_session *session, int32_t stream_id,
704                                 uint8_t *buf, size_t length, int *eof,
705                                 spdylay_data_source *source, void *user_data) {
706   auto downstream = static_cast<Downstream *>(source->ptr);
707   auto upstream = static_cast<SpdyUpstream *>(downstream->get_upstream());
708   auto body = downstream->get_response_buf();
709   assert(body);
710
711   auto dconn = downstream->get_downstream_connection();
712
713   if (body->rleft() == 0 && dconn &&
714       downstream->get_response_state() != Downstream::MSG_COMPLETE) {
715     // Try to read more if buffer is empty.  This will help small
716     // buffer and make priority handling a bit better.
717     if (upstream->downstream_read(dconn) != 0) {
718       return SPDYLAY_ERR_CALLBACK_FAILURE;
719     }
720   }
721
722   auto nread = body->remove(buf, length);
723   auto body_empty = body->rleft() == 0;
724
725   if (nread == 0 &&
726       downstream->get_response_state() == Downstream::MSG_COMPLETE) {
727     if (!downstream->get_upgraded()) {
728       *eof = 1;
729     } else {
730       // For tunneling, issue RST_STREAM to finish the stream.
731       if (LOG_ENABLED(INFO)) {
732         ULOG(INFO, upstream)
733             << "RST_STREAM to tunneled stream stream_id=" << stream_id;
734       }
735       upstream->rst_stream(
736           downstream, infer_upstream_rst_stream_status_code(
737                           downstream->get_response_rst_stream_error_code()));
738     }
739   }
740
741   if (body_empty) {
742     downstream->disable_upstream_wtimer();
743   } else {
744     downstream->reset_upstream_wtimer();
745   }
746
747   if (nread > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nread) != 0) {
748     return SPDYLAY_ERR_CALLBACK_FAILURE;
749   }
750
751   if (nread == 0 && *eof != 1) {
752     return SPDYLAY_ERR_DEFERRED;
753   }
754
755   if (nread > 0) {
756     downstream->add_response_sent_bodylen(nread);
757   }
758
759   return nread;
760 }
761 } // namespace
762
763 int SpdyUpstream::error_reply(Downstream *downstream,
764                               unsigned int status_code) {
765   int rv;
766   auto html = http::create_error_html(status_code);
767   downstream->set_response_http_status(status_code);
768   auto body = downstream->get_response_buf();
769   body->append(html.c_str(), html.size());
770   downstream->set_response_state(Downstream::MSG_COMPLETE);
771
772   spdylay_data_provider data_prd;
773   data_prd.source.ptr = downstream;
774   data_prd.read_callback = spdy_data_read_callback;
775
776   std::string content_length = util::utos(html.size());
777   std::string status_string = http2::get_status_string(status_code);
778   const char *nv[] = {":status",        status_string.c_str(),
779                       ":version",       "http/1.1",
780                       "content-type",   "text/html; charset=UTF-8",
781                       "server",         get_config()->server_name,
782                       "content-length", content_length.c_str(),
783                       nullptr};
784
785   rv = spdylay_submit_response(session_, downstream->get_stream_id(), nv,
786                                &data_prd);
787   if (rv < SPDYLAY_ERR_FATAL) {
788     ULOG(FATAL, this) << "spdylay_submit_response() failed: "
789                       << spdylay_strerror(rv);
790     return -1;
791   }
792
793   return 0;
794 }
795
796 Downstream *SpdyUpstream::add_pending_downstream(int32_t stream_id,
797                                                  int32_t priority) {
798   auto downstream = make_unique<Downstream>(this, stream_id, priority);
799   auto res = downstream.get();
800
801   downstream_queue_.add_pending(std::move(downstream));
802
803   return res;
804 }
805
806 void SpdyUpstream::remove_downstream(Downstream *downstream) {
807   if (downstream->accesslog_ready()) {
808     handler_->write_accesslog(downstream);
809   }
810
811   auto next_downstream =
812       downstream_queue_.remove_and_pop_blocked(downstream->get_stream_id());
813
814   if (next_downstream) {
815     initiate_downstream(std::move(next_downstream));
816   }
817 }
818
819 Downstream *SpdyUpstream::find_downstream(int32_t stream_id) {
820   return downstream_queue_.find(stream_id);
821 }
822
823 spdylay_session *SpdyUpstream::get_http2_session() { return session_; }
824
825 // WARNING: Never call directly or indirectly spdylay_session_send or
826 // spdylay_session_recv. These calls may delete downstream.
827 int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) {
828   if (downstream->get_non_final_response()) {
829     // SPDY does not support non-final response.  We could send it
830     // with HEADERS and final response in SYN_REPLY, but it is not
831     // official way.
832     downstream->clear_response_headers();
833
834     return 0;
835   }
836
837   if (LOG_ENABLED(INFO)) {
838     DLOG(INFO, downstream) << "HTTP response header completed";
839   }
840
841   if (!get_config()->http2_proxy && !get_config()->client_proxy &&
842       !get_config()->no_location_rewrite) {
843     downstream->rewrite_location_response_header(
844         downstream->get_request_http2_scheme());
845   }
846   size_t nheader = downstream->get_response_headers().size();
847   // 8 means server, :status, :version and possible via header field.
848   auto nv = make_unique<const char *[]>(
849       nheader * 2 + 8 + get_config()->add_response_headers.size() * 2 + 1);
850
851   size_t hdidx = 0;
852   std::string via_value;
853   std::string status_string =
854       http2::get_status_string(downstream->get_response_http_status());
855   nv[hdidx++] = ":status";
856   nv[hdidx++] = status_string.c_str();
857   nv[hdidx++] = ":version";
858   nv[hdidx++] = "HTTP/1.1";
859   for (auto &hd : downstream->get_response_headers()) {
860     if (hd.name.empty() || hd.name.c_str()[0] == ':') {
861       continue;
862     }
863     switch (hd.token) {
864     case http2::HD_CONNECTION:
865     case http2::HD_KEEP_ALIVE:
866     case http2::HD_PROXY_CONNECTION:
867     case http2::HD_TRANSFER_ENCODING:
868     case http2::HD_VIA:
869     case http2::HD_SERVER:
870       continue;
871     }
872
873     nv[hdidx++] = hd.name.c_str();
874     nv[hdidx++] = hd.value.c_str();
875   }
876
877   if (!get_config()->http2_proxy && !get_config()->client_proxy) {
878     nv[hdidx++] = "server";
879     nv[hdidx++] = get_config()->server_name;
880   } else {
881     auto server = downstream->get_response_header(http2::HD_SERVER);
882     if (server) {
883       nv[hdidx++] = "server";
884       nv[hdidx++] = server->value.c_str();
885     }
886   }
887
888   auto via = downstream->get_response_header(http2::HD_VIA);
889   if (get_config()->no_via) {
890     if (via) {
891       nv[hdidx++] = "via";
892       nv[hdidx++] = via->value.c_str();
893     }
894   } else {
895     if (via) {
896       via_value = via->value;
897       via_value += ", ";
898     }
899     via_value += http::create_via_header_value(
900         downstream->get_response_major(), downstream->get_response_minor());
901     nv[hdidx++] = "via";
902     nv[hdidx++] = via_value.c_str();
903   }
904
905   for (auto &p : get_config()->add_response_headers) {
906     nv[hdidx++] = p.first.c_str();
907     nv[hdidx++] = p.second.c_str();
908   }
909
910   nv[hdidx++] = 0;
911   if (LOG_ENABLED(INFO)) {
912     std::stringstream ss;
913     for (size_t i = 0; nv[i]; i += 2) {
914       ss << TTY_HTTP_HD << nv[i] << TTY_RST << ": " << nv[i + 1] << "\n";
915     }
916     ULOG(INFO, this) << "HTTP response headers. stream_id="
917                      << downstream->get_stream_id() << "\n" << ss.str();
918   }
919   spdylay_data_provider data_prd;
920   data_prd.source.ptr = downstream;
921   data_prd.read_callback = spdy_data_read_callback;
922
923   int rv;
924   rv = spdylay_submit_response(session_, downstream->get_stream_id(), nv.get(),
925                                &data_prd);
926   if (rv != 0) {
927     ULOG(FATAL, this) << "spdylay_submit_response() failed";
928     return -1;
929   }
930
931   return 0;
932 }
933
934 // WARNING: Never call directly or indirectly spdylay_session_send or
935 // spdylay_session_recv. These calls may delete downstream.
936 int SpdyUpstream::on_downstream_body(Downstream *downstream,
937                                      const uint8_t *data, size_t len,
938                                      bool flush) {
939   auto body = downstream->get_response_buf();
940   body->append(data, len);
941
942   if (flush) {
943     spdylay_session_resume_data(session_, downstream->get_stream_id());
944
945     downstream->ensure_upstream_wtimer();
946   }
947
948   return 0;
949 }
950
951 // WARNING: Never call directly or indirectly spdylay_session_send or
952 // spdylay_session_recv. These calls may delete downstream.
953 int SpdyUpstream::on_downstream_body_complete(Downstream *downstream) {
954   if (LOG_ENABLED(INFO)) {
955     DLOG(INFO, downstream) << "HTTP response completed";
956   }
957
958   if (!downstream->validate_response_bodylen()) {
959     rst_stream(downstream, SPDYLAY_PROTOCOL_ERROR);
960     downstream->set_response_connection_close(true);
961     return 0;
962   }
963
964   spdylay_session_resume_data(session_, downstream->get_stream_id());
965   downstream->ensure_upstream_wtimer();
966
967   return 0;
968 }
969
970 bool SpdyUpstream::get_flow_control() const { return flow_control_; }
971
972 void SpdyUpstream::pause_read(IOCtrlReason reason) {}
973
974 int SpdyUpstream::resume_read(IOCtrlReason reason, Downstream *downstream,
975                               size_t consumed) {
976   if (get_flow_control()) {
977     assert(downstream->get_request_datalen() >= consumed);
978
979     if (consume(downstream->get_stream_id(), consumed) != 0) {
980       return -1;
981     }
982
983     downstream->dec_request_datalen(consumed);
984   }
985
986   handler_->signal_write();
987   return 0;
988 }
989
990 int SpdyUpstream::on_downstream_abort_request(Downstream *downstream,
991                                               unsigned int status_code) {
992   int rv;
993
994   rv = error_reply(downstream, status_code);
995
996   if (rv != 0) {
997     return -1;
998   }
999
1000   handler_->signal_write();
1001   return 0;
1002 }
1003
1004 int SpdyUpstream::consume(int32_t stream_id, size_t len) {
1005   int rv;
1006
1007   rv = spdylay_session_consume(session_, stream_id, len);
1008
1009   if (rv != 0) {
1010     ULOG(WARN, this) << "spdylay_session_consume() returned error: "
1011                      << spdylay_strerror(rv);
1012     return -1;
1013   }
1014
1015   return 0;
1016 }
1017
1018 int SpdyUpstream::on_timeout(Downstream *downstream) {
1019   if (LOG_ENABLED(INFO)) {
1020     ULOG(INFO, this) << "Stream timeout stream_id="
1021                      << downstream->get_stream_id();
1022   }
1023
1024   rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
1025
1026   return 0;
1027 }
1028
1029 void SpdyUpstream::on_handler_delete() {
1030   for (auto &ent : downstream_queue_.get_active_downstreams()) {
1031     if (ent.second->accesslog_ready()) {
1032       handler_->write_accesslog(ent.second.get());
1033     }
1034   }
1035 }
1036
1037 int SpdyUpstream::on_downstream_reset(bool no_retry) {
1038   int rv;
1039
1040   for (auto &ent : downstream_queue_.get_active_downstreams()) {
1041     auto downstream = ent.second.get();
1042     if ((downstream->get_request_state() != Downstream::HEADER_COMPLETE &&
1043          downstream->get_request_state() != Downstream::MSG_COMPLETE) ||
1044         downstream->get_response_state() != Downstream::INITIAL) {
1045       rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
1046       downstream->pop_downstream_connection();
1047       continue;
1048     }
1049
1050     downstream->pop_downstream_connection();
1051
1052     downstream->add_retry();
1053
1054     if (no_retry || downstream->no_more_retry()) {
1055       if (on_downstream_abort_request(downstream, 503) != 0) {
1056         return -1;
1057       }
1058       return 0;
1059     }
1060
1061     // downstream connection is clean; we can retry with new
1062     // downstream connection.
1063
1064     rv = downstream->attach_downstream_connection(
1065         handler_->get_downstream_connection());
1066     if (rv != 0) {
1067       rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
1068       downstream->pop_downstream_connection();
1069       continue;
1070     }
1071   }
1072
1073   handler_->signal_write();
1074
1075   return 0;
1076 }
1077
1078 MemchunkPool *SpdyUpstream::get_mcpool() { return &mcpool_; }
1079
1080 } // namespace shrpx