2 * nghttp2 - HTTP/2 C Library
4 * Copyright (c) 2015 Tatsuhiro Tsujikawa
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 #include "shrpx_memcached_connection.h"
32 #include "shrpx_memcached_request.h"
33 #include "shrpx_memcached_result.h"
34 #include "shrpx_config.h"
35 #include "shrpx_tls.h"
36 #include "shrpx_log.h"
42 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
43 auto conn = static_cast<Connection *>(w->data);
44 auto mconn = static_cast<MemcachedConnection *>(conn->data);
46 if (w == &conn->rt && !conn->expired_rt()) {
50 if (LOG_ENABLED(INFO)) {
51 MCLOG(INFO, mconn) << "Time out";
59 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
60 auto conn = static_cast<Connection *>(w->data);
61 auto mconn = static_cast<MemcachedConnection *>(conn->data);
63 if (mconn->on_read() != 0) {
64 mconn->reconnect_or_fail();
71 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
72 auto conn = static_cast<Connection *>(w->data);
73 auto mconn = static_cast<MemcachedConnection *>(conn->data);
75 if (mconn->on_write() != 0) {
76 mconn->reconnect_or_fail();
83 void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
84 auto conn = static_cast<Connection *>(w->data);
85 auto mconn = static_cast<MemcachedConnection *>(conn->data);
87 if (mconn->connected() != 0) {
92 writecb(loop, w, revents);
96 constexpr auto write_timeout = 10_s;
97 constexpr auto read_timeout = 10_s;
99 MemcachedConnection::MemcachedConnection(const Address *addr,
100 struct ev_loop *loop, SSL_CTX *ssl_ctx,
101 const StringRef &sni_name,
102 MemchunkPool *mcpool,
104 : conn_(loop, -1, nullptr, mcpool, write_timeout, read_timeout, {}, {},
105 connectcb, readcb, timeoutcb, this, 0, 0., Proto::MEMCACHED),
106 do_read_(&MemcachedConnection::noop),
107 do_write_(&MemcachedConnection::noop),
110 gen, loop, [] {}, [] {}),
118 MemcachedConnection::~MemcachedConnection() { conn_.disconnect(); }
121 void clear_request(std::deque<std::unique_ptr<MemcachedRequest>> &q) {
122 for (auto &req : q) {
125 MemcachedResult(MemcachedStatusCode::EXT_NETWORK_ERROR));
132 void MemcachedConnection::disconnect() {
133 clear_request(recvq_);
134 clear_request(sendq_);
145 assert(recvbuf_.rleft() == 0);
148 do_read_ = do_write_ = &MemcachedConnection::noop;
151 int MemcachedConnection::initiate_connection() {
152 assert(conn_.fd == -1);
155 auto ssl = tls::create_ssl(ssl_ctx_);
160 conn_.tls.client_session_cache = &tls_session_cache_;
163 conn_.fd = util::create_nonblock_socket(addr_->su.storage.ss_family);
165 if (conn_.fd == -1) {
167 MCLOG(WARN, this) << "socket() failed; errno=" << error;
173 rv = connect(conn_.fd, &addr_->su.sa, addr_->len);
174 if (rv != 0 && errno != EINPROGRESS) {
176 MCLOG(WARN, this) << "connect() failed; errno=" << error;
185 if (!util::numeric_host(sni_name_.c_str())) {
186 SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name_.c_str());
189 auto session = tls::reuse_tls_session(tls_session_cache_);
191 SSL_set_session(conn_.tls.ssl, session);
192 SSL_SESSION_free(session);
195 conn_.prepare_client_handshake();
198 if (LOG_ENABLED(INFO)) {
199 MCLOG(INFO, this) << "Connecting to memcached server";
202 ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
203 ev_io_set(&conn_.rev, conn_.fd, EV_READ);
205 ev_set_cb(&conn_.wev, connectcb);
207 conn_.wlimit.startw();
208 ev_timer_again(conn_.loop, &conn_.wt);
213 int MemcachedConnection::connected() {
214 auto sock_error = util::get_socket_error(conn_.fd);
215 if (sock_error != 0) {
216 MCLOG(WARN, this) << "memcached connect failed; addr="
217 << util::to_numeric_addr(addr_)
218 << ": errno=" << sock_error;
220 connect_blocker_.on_failure();
222 conn_.wlimit.stopw();
227 if (LOG_ENABLED(INFO)) {
228 MCLOG(INFO, this) << "connected to memcached server";
231 conn_.rlimit.startw();
233 ev_set_cb(&conn_.wev, writecb);
238 do_read_ = &MemcachedConnection::tls_handshake;
239 do_write_ = &MemcachedConnection::tls_handshake;
244 ev_timer_stop(conn_.loop, &conn_.wt);
248 connect_blocker_.on_success();
250 do_read_ = &MemcachedConnection::read_clear;
251 do_write_ = &MemcachedConnection::write_clear;
256 int MemcachedConnection::on_write() { return do_write_(*this); }
257 int MemcachedConnection::on_read() { return do_read_(*this); }
259 int MemcachedConnection::tls_handshake() {
262 conn_.last_read = ev_now(conn_.loop);
264 auto rv = conn_.tls_handshake();
265 if (rv == SHRPX_ERR_INPROGRESS) {
270 connect_blocker_.on_failure();
274 if (LOG_ENABLED(INFO)) {
275 LOG(INFO) << "SSL/TLS handshake completed";
278 auto &tlsconf = get_config()->tls;
280 if (!tlsconf.insecure &&
281 tls::check_cert(conn_.tls.ssl, addr_, sni_name_) != 0) {
282 connect_blocker_.on_failure();
286 ev_timer_stop(conn_.loop, &conn_.rt);
287 ev_timer_stop(conn_.loop, &conn_.wt);
291 connect_blocker_.on_success();
293 do_read_ = &MemcachedConnection::read_tls;
294 do_write_ = &MemcachedConnection::write_tls;
299 int MemcachedConnection::write_tls() {
304 conn_.last_read = ev_now(conn_.loop);
306 std::array<struct iovec, MAX_WR_IOVCNT> iov;
307 std::array<uint8_t, 16_k> buf;
309 for (; !sendq_.empty();) {
310 auto iovcnt = fill_request_buffer(iov.data(), iov.size());
311 auto p = std::begin(buf);
312 for (size_t i = 0; i < iovcnt; ++i) {
314 auto n = std::min(static_cast<size_t>(std::end(buf) - p), v.iov_len);
315 p = std::copy_n(static_cast<uint8_t *>(v.iov_base), n, p);
316 if (p == std::end(buf)) {
321 auto nwrite = conn_.write_tls(buf.data(), p - std::begin(buf));
329 drain_send_queue(nwrite);
332 conn_.wlimit.stopw();
333 ev_timer_stop(conn_.loop, &conn_.wt);
338 int MemcachedConnection::read_tls() {
343 conn_.last_read = ev_now(conn_.loop);
346 auto nread = conn_.read_tls(recvbuf_.last, recvbuf_.wleft());
356 recvbuf_.write(nread);
358 if (parse_packet() != 0) {
366 int MemcachedConnection::write_clear() {
371 conn_.last_read = ev_now(conn_.loop);
373 std::array<struct iovec, MAX_WR_IOVCNT> iov;
375 for (; !sendq_.empty();) {
376 auto iovcnt = fill_request_buffer(iov.data(), iov.size());
377 auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
385 drain_send_queue(nwrite);
388 conn_.wlimit.stopw();
389 ev_timer_stop(conn_.loop, &conn_.wt);
394 int MemcachedConnection::read_clear() {
399 conn_.last_read = ev_now(conn_.loop);
402 auto nread = conn_.read_clear(recvbuf_.last, recvbuf_.wleft());
412 recvbuf_.write(nread);
414 if (parse_packet() != 0) {
422 int MemcachedConnection::parse_packet() {
423 auto in = recvbuf_.pos;
428 switch (parse_state_.state) {
429 case MemcachedParseState::HEADER24: {
430 if (recvbuf_.last - in < 24) {
431 recvbuf_.drain_reset(in - recvbuf_.pos);
435 if (recvq_.empty()) {
437 << "Response received, but there is no in-flight request.";
441 auto &req = recvq_.front();
443 if (*in != MEMCACHED_RES_MAGIC) {
444 MCLOG(WARN, this) << "Response has bad magic: "
445 << static_cast<uint32_t>(*in);
450 parse_state_.op = static_cast<MemcachedOp>(*in++);
451 parse_state_.keylen = util::get_uint16(in);
453 parse_state_.extralen = *in++;
454 // skip 1 byte reserved data type
456 parse_state_.status_code =
457 static_cast<MemcachedStatusCode>(util::get_uint16(in));
459 parse_state_.totalbody = util::get_uint32(in);
461 // skip 4 bytes opaque
463 parse_state_.cas = util::get_uint64(in);
466 if (req->op != parse_state_.op) {
468 << "opcode in response does not match to the request: want "
469 << static_cast<uint32_t>(req->op) << ", got "
470 << static_cast<uint32_t>(parse_state_.op);
474 if (parse_state_.keylen != 0) {
475 MCLOG(WARN, this) << "zero length keylen expected: got "
476 << parse_state_.keylen;
480 if (parse_state_.totalbody > 16_k) {
481 MCLOG(WARN, this) << "totalbody is too large: got "
482 << parse_state_.totalbody;
486 if (parse_state_.op == MemcachedOp::GET &&
487 parse_state_.status_code == MemcachedStatusCode::NO_ERROR &&
488 parse_state_.extralen == 0) {
489 MCLOG(WARN, this) << "response for GET does not have extra";
493 if (parse_state_.totalbody <
494 parse_state_.keylen + parse_state_.extralen) {
495 MCLOG(WARN, this) << "totalbody is too short: totalbody "
496 << parse_state_.totalbody << ", want min "
497 << parse_state_.keylen + parse_state_.extralen;
501 if (parse_state_.extralen) {
502 parse_state_.state = MemcachedParseState::EXTRA;
503 parse_state_.read_left = parse_state_.extralen;
505 parse_state_.state = MemcachedParseState::VALUE;
506 parse_state_.read_left = parse_state_.totalbody - parse_state_.keylen -
507 parse_state_.extralen;
512 case MemcachedParseState::EXTRA: {
513 // We don't use extra for now. Just read and forget.
514 auto n = std::min(static_cast<size_t>(recvbuf_.last - in),
515 parse_state_.read_left);
517 parse_state_.read_left -= n;
519 if (parse_state_.read_left) {
523 parse_state_.state = MemcachedParseState::VALUE;
524 // since we require keylen == 0, totalbody - extralen ==
526 parse_state_.read_left =
527 parse_state_.totalbody - parse_state_.keylen - parse_state_.extralen;
531 case MemcachedParseState::VALUE: {
532 auto n = std::min(static_cast<size_t>(recvbuf_.last - in),
533 parse_state_.read_left);
535 parse_state_.value.insert(std::end(parse_state_.value), in, in + n);
537 parse_state_.read_left -= n;
539 if (parse_state_.read_left) {
544 if (LOG_ENABLED(INFO)) {
545 if (parse_state_.status_code != MemcachedStatusCode::NO_ERROR) {
546 MCLOG(INFO, this) << "response returned error status: "
547 << static_cast<uint16_t>(parse_state_.status_code);
551 // We require at least one complete response to clear try count.
554 auto req = std::move(recvq_.front());
557 if (sendq_.empty() && recvq_.empty()) {
558 ev_timer_stop(conn_.loop, &conn_.rt);
561 if (!req->canceled && req->cb) {
562 req->cb(req.get(), MemcachedResult(parse_state_.status_code,
563 std::move(parse_state_.value)));
571 if (!busy && in == recvbuf_.last) {
576 assert(in == recvbuf_.last);
582 #undef DEFAULT_WR_IOVCNT
583 #define DEFAULT_WR_IOVCNT 128
585 #if defined(IOV_MAX) && IOV_MAX < DEFAULT_WR_IOVCNT
586 # define MAX_WR_IOVCNT IOV_MAX
587 #else // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT
588 # define MAX_WR_IOVCNT DEFAULT_WR_IOVCNT
589 #endif // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT
591 size_t MemcachedConnection::fill_request_buffer(struct iovec *iov,
594 for (auto &req : sendq_) {
598 if (serialized_size(req.get()) + sendsum_ > 1300) {
601 sendbufv_.emplace_back();
602 sendbufv_.back().req = req.get();
603 make_request(&sendbufv_.back(), req.get());
604 sendsum_ += sendbufv_.back().left();
614 for (auto &buf : sendbufv_) {
615 if (iovcnt + 2 > iovlen) {
620 if (buf.headbuf.rleft()) {
621 iov[iovcnt++] = {buf.headbuf.pos, buf.headbuf.rleft()};
623 if (buf.send_value_left) {
624 iov[iovcnt++] = {req->value.data() + req->value.size() -
626 buf.send_value_left};
633 void MemcachedConnection::drain_send_queue(size_t nwrite) {
637 auto &buf = sendbufv_.front();
638 auto &req = sendq_.front();
643 assert(buf.req == req.get());
644 auto n = std::min(static_cast<size_t>(nwrite), buf.headbuf.rleft());
645 buf.headbuf.drain(n);
647 n = std::min(static_cast<size_t>(nwrite), buf.send_value_left);
648 buf.send_value_left -= n;
651 if (buf.headbuf.rleft() || buf.send_value_left) {
654 sendbufv_.pop_front();
655 recvq_.push_back(std::move(sendq_.front()));
659 // start read timer only when we wait for responses.
660 if (recvq_.empty()) {
661 ev_timer_stop(conn_.loop, &conn_.rt);
662 } else if (!ev_is_active(&conn_.rt)) {
667 size_t MemcachedConnection::serialized_size(MemcachedRequest *req) {
669 case MemcachedOp::GET:
670 return 24 + req->key.size();
671 case MemcachedOp::ADD:
673 return 24 + 8 + req->key.size() + req->value.size();
677 void MemcachedConnection::make_request(MemcachedSendbuf *sendbuf,
678 MemcachedRequest *req) {
679 auto &headbuf = sendbuf->headbuf;
681 std::fill(std::begin(headbuf.buf), std::end(headbuf.buf), 0);
683 headbuf[0] = MEMCACHED_REQ_MAGIC;
684 headbuf[1] = static_cast<uint8_t>(req->op);
686 case MemcachedOp::GET:
687 util::put_uint16be(&headbuf[2], req->key.size());
688 util::put_uint32be(&headbuf[8], req->key.size());
691 case MemcachedOp::ADD:
692 util::put_uint16be(&headbuf[2], req->key.size());
694 util::put_uint32be(&headbuf[8], 8 + req->key.size() + req->value.size());
695 util::put_uint32be(&headbuf[28], req->expiry);
700 headbuf.write(req->key.c_str(), req->key.size());
702 sendbuf->send_value_left = req->value.size();
705 int MemcachedConnection::add_request(std::unique_ptr<MemcachedRequest> req) {
706 if (connect_blocker_.blocked()) {
710 sendq_.push_back(std::move(req));
717 if (conn_.fd == -1 && initiate_connection() != 0) {
718 connect_blocker_.on_failure();
726 // TODO should we start write timer too?
727 void MemcachedConnection::signal_write() { conn_.wlimit.startw(); }
729 int MemcachedConnection::noop() { return 0; }
731 void MemcachedConnection::reconnect_or_fail() {
732 if (!connected_ || (recvq_.empty() && sendq_.empty())) {
737 constexpr size_t MAX_TRY_COUNT = 3;
739 if (++try_count_ >= MAX_TRY_COUNT) {
740 if (LOG_ENABLED(INFO)) {
741 MCLOG(INFO, this) << "Tried " << MAX_TRY_COUNT
742 << " times, and all failed. Aborting";
749 std::vector<std::unique_ptr<MemcachedRequest>> q;
750 q.reserve(recvq_.size() + sendq_.size());
752 if (LOG_ENABLED(INFO)) {
753 MCLOG(INFO, this) << "Retry connection, enqueue "
754 << recvq_.size() + sendq_.size() << " request(s) again";
757 q.insert(std::end(q), std::make_move_iterator(std::begin(recvq_)),
758 std::make_move_iterator(std::end(recvq_)));
759 q.insert(std::end(q), std::make_move_iterator(std::begin(sendq_)),
760 std::make_move_iterator(std::end(sendq_)));
767 sendq_.insert(std::end(sendq_), std::make_move_iterator(std::begin(q)),
768 std::make_move_iterator(std::end(q)));
770 if (initiate_connection() != 0) {
771 connect_blocker_.on_failure();