From 8410ef95738e8cb55965e579fe6d586f3e96b785 Mon Sep 17 00:00:00 2001 From: Krisztian Litkey Date: Thu, 3 May 2012 03:02:27 +0300 Subject: [PATCH] common: receive-side buffering fixes, added tcp_resolve. --- src/common/tcp-transport.c | 150 ++++++++++++++++++++++++++------------------ src/common/tests/tcp-test.c | 46 +++++--------- 2 files changed, 105 insertions(+), 91 deletions(-) diff --git a/src/common/tcp-transport.c b/src/common/tcp-transport.c index d285432..c50486f 100644 --- a/src/common/tcp-transport.c +++ b/src/common/tcp-transport.c @@ -13,7 +13,7 @@ #include #include -#define DEFAULT_SIZE 1024 /* default input buffer size */ +#define DEFAULT_SIZE 128 /* default input buffer size */ typedef struct { MRP_TRANSPORT_PUBLIC_FIELDS; /* common transport fields */ @@ -33,6 +33,41 @@ static void tcp_recv_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd, static int tcp_disconnect(mrp_transport_t *mt); static int open_socket(tcp_t *t, int family); +static socklen_t tcp_resolve(char *str, void *addr, socklen_t size) +{ + struct addrinfo *ai, hints; + char node[512], *port; + + mrp_clear(&hints); + hints.ai_family = AF_UNSPEC; + ai = NULL; + + if (!strncmp(str, "tcp:" , 4)) str += 4; + else if (!strncmp(str, "tcp4:", 5)) str += 5, hints.ai_family = AF_INET; + else if (!strncmp(str, "tcp6:", 5)) str += 5, hints.ai_family = AF_INET6; + + strncpy(node, str, sizeof(node) - 1); + node[sizeof(node) - 1] = '\0'; + if ((port = strrchr(node, ':')) == NULL) + return FALSE; + *port++ = '\0'; + + if (getaddrinfo(node, port, &hints, &ai) == 0) { + if (size >= ai->ai_addrlen) { + memcpy(addr, ai->ai_addr, ai->ai_addrlen); + size = ai->ai_addrlen; + } + else + size = 0; + freeaddrinfo(ai); + + return size; + } + else + return 0; +} + + static int tcp_open(mrp_transport_t *mt) { tcp_t *t = (tcp_t *)mt; @@ -112,40 +147,13 @@ static void tcp_close(mrp_transport_t *mt) } -static socklen_t getaddr(char *str, int *family, struct sockaddr *addr) -{ - struct addrinfo *ai; - char node[512], *port; - socklen_t len; - - ai = NULL; - strncpy(node, (char *)str, sizeof(node) - 1); - node[sizeof(node) - 1] = '\0'; - - if ((port = strrchr(node, ':')) == NULL) - return FALSE; - *port++ = '\0'; - - if (getaddrinfo(node, port, NULL, &ai) == 0) { - *family = ai->ai_family; - memcpy(addr, ai->ai_addr, ai->ai_addrlen); - len = ai->ai_addrlen; - freeaddrinfo(ai); - - return len; - } - else - return 0; -} - - static void tcp_recv_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd, mrp_io_event_t events, void *user_data) { tcp_t *t = (tcp_t *)user_data; mrp_transport_t *mt = (mrp_transport_t *)t; uint32_t *sizep, size; - ssize_t n; + ssize_t n, space, left; void *data; int old, error; mrp_msg_t *msg; @@ -154,10 +162,16 @@ static void tcp_recv_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd, MRP_UNUSED(w); if (events & MRP_IO_EVENT_IN) { + /* + * enlarge the buffer buddy-style if we're out of space + */ + realloc: if (t->idata == t->isize) { - if (t->isize != 0) { - old = t->isize; - t->isize *= 2; + if (t->isize > sizeof(size)) { + old = t->isize; + sizep = t->ibuf; + size = sizeof(size) + ntohl(*sizep); + t->isize = size; } else { old = 0; @@ -179,37 +193,49 @@ static void tcp_recv_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd, } } - while ((n = read(fd, t->ibuf + t->idata, sizeof(size))) > 0) { + + space = t->isize - t->idata; + while ((n = read(fd, t->ibuf + t->idata, space)) > 0) { t->idata += n; - sizep = t->ibuf; - size = ntohl(*sizep); - while (t->idata >= sizeof(size) + size) { - data = t->ibuf + sizeof(size); - msg = mrp_msg_default_decode(data, size); - - if (msg != NULL) { - MRP_TRANSPORT_BUSY(mt, { - mt->evt.recv(mt, msg, mt->user_data); - }); - - mrp_msg_unref(msg); - - if (t->check_destroy(mt)) - return; - } - else { - error = EPROTO; - goto fatal_error; - } - - size = t->idata - (sizeof(size) + size); - memmove(t->ibuf, t->ibuf + sizeof(size) + size, size); - t->idata = size; - + if (t->idata >= sizeof(size)) { sizep = t->ibuf; size = ntohl(*sizep); + + while (t->idata >= sizeof(size) + size) { + data = t->ibuf + sizeof(size); + msg = mrp_msg_default_decode(data, size); + + if (msg != NULL) { + MRP_TRANSPORT_BUSY(mt, { + mt->evt.recv(mt, msg, mt->user_data); + }); + + mrp_msg_unref(msg); + + if (t->check_destroy(mt)) + return; + } + else { + error = EPROTO; + goto fatal_error; + } + + left = t->idata - (sizeof(size) + size); + memmove(t->ibuf, t->ibuf + sizeof(size) + size, left); + t->idata = left; + + if (t->idata >= sizeof(size)) { + sizep = t->ibuf; + size = ntohl(*sizep); + } + else + size = (uint32_t)-1; + } } + space = t->isize - t->idata; + if (space == 0) + goto realloc; } if (n < 0 && errno != EAGAIN) { @@ -252,14 +278,14 @@ static int tcp_connect(mrp_transport_t *mt, void *addrstr) tcp_t *t = (tcp_t *)mt; struct sockaddr addr; int addrlen; - int family, reuse; + int reuse; long nonblk; mrp_io_event_t events; - addrlen = getaddr(addrstr, &family, &addr); + addrlen = mrp_transport_resolve(mt, addrstr, &addr, sizeof(addr)); if (addrlen > 0) { - t->sock = socket(family, SOCK_STREAM, 0); + t->sock = socket(addr.sa_family, SOCK_STREAM, 0); if (t->sock < 0) return FALSE; @@ -346,7 +372,7 @@ static int tcp_send(mrp_transport_t *mt, mrp_msg_t *msg) } -MRP_REGISTER_TRANSPORT("tcp", tcp_t, NULL, +MRP_REGISTER_TRANSPORT("tcp", tcp_t, tcp_resolve, tcp_open, tcp_bind, tcp_accept, tcp_close, tcp_connect, tcp_disconnect, tcp_send, NULL); diff --git a/src/common/tests/tcp-test.c b/src/common/tests/tcp-test.c index 71a61d6..b33ddbf 100644 --- a/src/common/tests/tcp-test.c +++ b/src/common/tests/tcp-test.c @@ -20,31 +20,6 @@ typedef struct { } context_t; -static socklen_t getaddr(char *str, int *family, struct sockaddr *addr) -{ - struct addrinfo *ai; - char node[512], *port; - - ai = NULL; - strncpy(node, (char *)str, sizeof(node) - 1); - node[sizeof(node) - 1] = '\0'; - - if ((port = strrchr(node, ':')) == NULL) - return FALSE; - *port++ = '\0'; - - if (getaddrinfo(node, port, NULL, &ai) == 0) { - *family = ai->ai_family; - memcpy(addr, ai->ai_addr, ai->ai_addrlen); - freeaddrinfo(ai); - - return ai->ai_addrlen; - } - else - return 0; -} - - void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data) { context_t *c = (context_t *)user_data; @@ -56,6 +31,12 @@ void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data) mrp_msg_dump(msg, stdout); if (c->server) { + +#define REPLY_KEY "this_is_a_rather_long_reply_field_name_that_I_hope_will_cause_reallocation_of_the_message_receiving_buffer_on_the_server_side_and_we_will_see_if_it_can_automatically_readjust_its_buffers" +#define REPLY_VAL "and_this_is_the_rather_long_value_of_the_rather_long_field_name_that_we_hope_might_break_something_if_the_allocation_algorithm_has_horrible_easy_to_exploit_holes" + + mrp_msg_append(msg, REPLY_KEY, REPLY_VAL, strlen(REPLY_VAL) + 1); + if (mrp_transport_send(t, msg)) mrp_log_info("Reply successfully sent."); else @@ -115,15 +96,14 @@ void server_init(context_t *c) { struct sockaddr addr; socklen_t addrlen; - int family; mrp_io_event_t events; int reuse; long nonblk; - addrlen = getaddr(c->addr, &family, &addr); + addrlen = mrp_transport_resolve(NULL, c->addr, &addr, sizeof(addr)); if (addrlen > 0) { - c->sock = socket(family, SOCK_STREAM, 0); + c->sock = socket(addr.sa_family, SOCK_STREAM, 0); if (c->sock < 0) { mrp_log_error("Failed to create socket."); @@ -155,6 +135,10 @@ void server_init(context_t *c) exit(1); } } + else { + mrp_log_error("Failed to resolve address %s.", c->addr); + exit(1); + } } @@ -177,11 +161,15 @@ void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data) len = snprintf(seq, sizeof(seq), "%d", seqno); +#define LONG_KEY "aaaaaaaaaaaallllllllllllloooooooooooonnnnnnnnnnngggggggggffffffffffffiiiiiiiiiiiiieeeeeeeeeeeelllllllllllllddddddddddddnnnnnnnnnnnnnnnaaaaaaaaaaaaaaaammmmmmmmmmmmmmmeeeeeeeeeeeeeeeeeeeeee" +#define LONG_VAL "aaaaaaaaaaallllllllllllllllloooooooooooonnnnnnnnngggggggggggvvvvvvvvvvvvaaaaaaaaaaaaaalllllllllluuuuuuuuuuuuuueeeeee" + if (!mrp_msg_append(msg, "seq", seq, len + 1) || !mrp_msg_append(msg, "foo", "bar", sizeof("bar")) || !mrp_msg_append(msg, "bar", "foo", sizeof("foo")) || !mrp_msg_append(msg, "foobar", "barfoo", sizeof("barfoo")) || - !mrp_msg_append(msg, "barfoo", "foobar", sizeof("foobar"))) { + !mrp_msg_append(msg, "barfoo", "foobar", sizeof("foobar")) || + !mrp_msg_append(msg, LONG_KEY, LONG_VAL, strlen(LONG_VAL) + 1)) { mrp_log_error("Failed to construct message #%d.", seqno); exit(1); } -- 2.7.4