#include "log.h"
#include "proto.h"
-#define SEND_TIMEOUT_MSEC 1000
-#define SEND_PACKET_MAX 8192
-
struct recv_info {
int fd;
+ uint16_t seq;
recv_callback callback;
void *user_data;
enum message_type type;
- uint8_t *data;
int32_t len;
int32_t recved;
+ uint8_t *data;
+ uint8_t _data[0];
};
static GList *recv_list;
static pthread_mutex_t recv_lock = PTHREAD_MUTEX_INITIALIZER;
+static uint16_t __seq;
-static struct recv_info *find_rif(int fd)
+static struct recv_info *find_rif(int fd, uint16_t seq)
{
GList *l;
+ struct recv_info *rif;
for (l = recv_list; l; l = g_list_next(l)) {
- if (((struct recv_info *)l->data)->fd == fd)
- return l->data;
+ rif = l->data;
+ if (rif->fd == fd && rif->seq == seq)
+ return rif;
}
return NULL;
}
-static int recv_first(int fd, recv_callback callback, void *user_data)
+static void remove_rif(struct recv_info *rif)
+{
+ GList *l;
+
+ if (!rif)
+ return;
+
+ l = g_list_find(recv_list, rif);
+ if (l)
+ recv_list = g_list_delete_link(recv_list, l);
+
+ bxt_dbg("rif %p seq %u type %d len %d/%d freed",
+ rif, rif->seq, rif->type, rif->recved, rif->len);
+ free(rif);
+}
+
+static struct recv_info *add_rif(int fd, struct header *hdr,
+ recv_callback callback, void *user_data)
{
- int r;
struct recv_info *rif;
- uint32_t hdr;
+ struct recv_info *f;
- rif = calloc(1, sizeof(*rif));
+ assert(callback);
+ assert(hdr);
+
+ /* 8 is pad for alignment */
+ rif = malloc(sizeof(*rif) + hdr->total + 8);
if (!rif)
- return -1;
+ return NULL;
+
+ memset(rif, 0, sizeof(*rif));
rif->fd = fd;
+ rif->seq = hdr->seq;
+
rif->callback = callback;
rif->user_data = user_data;
- r = recv(fd, &hdr, sizeof(uint32_t), 0);
- if (r <= 0) {
- free(rif);
- if (r == 0) {
- bxt_dbg("recv: fd %d closed", fd);
- } else {
- if (errno == EAGAIN)
- return 0;
+ rif->type = hdr->mtype;
+ rif->len = hdr->total;
- bxt_err("recv: fd %d errno %d", fd, errno);
- }
+ /* 8 byte alignment */
+ rif->data = (uint8_t *)(((uintptr_t)rif->_data + 0x7) & ~0x7);
- return -1;
+ f = find_rif(rif->fd, rif->seq);
+ if (f) {
+ bxt_err("rif %p seq %u already exists in list", f, f->seq);
+ remove_rif(f);
}
- rif->type = hdr >> 24;
- rif->len = hdr & 0xffffff;
- if (rif->len == 0) {
- free(rif);
- bxt_err("recv: fd %d invalid header %x", fd, hdr);
- return -1;
- }
+ recv_list = g_list_append(recv_list, rif);
- rif->data = malloc(rif->len);
- if (!rif->data) {
- free(rif);
- return -1;
- }
+ bxt_dbg("rif %p seq %u type %d len %d added",
+ rif, rif->seq, rif->type, rif->len);
- recv_list = g_list_append(recv_list, rif);
+ return rif;
+}
- bxt_dbg("rif %p type %d len %d added", rif, rif->type, rif->len);
+static void flush_data(int fd, uint32_t len)
+{
+ int r;
+ uint32_t s;
+ char buf[4096];
- return 0;
+ while (len > 0) {
+ s = len > sizeof(buf) ? sizeof(buf) : len;
+
+ r = recv(fd, buf, s, 0);
+ if (r == -1)
+ break;
+ len -= r;
+ }
}
-static void remove_rif(struct recv_info *rif)
+static int proto_recv_single(int fd, recv_callback callback, void *user_data)
{
- if (!rif)
- return;
+ int r;
+ enum message_type type;
+ uint8_t *data;
+ int32_t len;
- recv_list = g_list_remove(recv_list, rif);
- free(rif->data);
- free(rif);
+ assert(fd >= 0);
+ assert(callback);
+
+ r = proto_recv(fd, &type, &data, &len);
+ if (r == -1)
+ return -1;
+
+ callback(user_data, type, data, len);
+ free(data);
+
+ return 0;
}
-static int recv_cont(struct recv_info *rif)
+static int recv_data(struct recv_info *rif, uint32_t len)
{
int r;
+ struct header *hdr;
+ uint8_t buf[MSG_MTU];
assert(rif);
- r = recv(rif->fd, &rif->data[rif->recved], rif->len - rif->recved, 0);
+ r = recv(rif->fd, buf, sizeof(*hdr) + len, 0);
if (r <= 0) {
if (r == 0) {
bxt_dbg("recv: fd %d closed", rif->fd);
bxt_err("recv: fd %d errno %d", rif->fd, errno);
}
- remove_rif(rif);
return -1;
}
- rif->recved += r;
- if (rif->recved > rif->len) {
- bxt_err("recv: fd %d expected %d > received %d", rif->fd,
- rif->len, rif->recved);
- remove_rif(rif);
+ if (r < sizeof(*hdr) + len) {
+ bxt_err("recv: fd %d expect %d > received %d",
+ rif->fd, sizeof(*hdr) + len, r);
return -1;
}
- if (rif->recved == rif->len) {
- bxt_dbg("rif %p received %d / %d", rif, rif->recved, rif->len);
+ hdr = (struct header *)buf;
+ memcpy(&rif->data[rif->recved], hdr->data, len);
+ rif->recved += len;
- assert(rif->callback);
+ return 0;
+}
+
+static int proto_recv_frag(int fd, struct header *hdr,
+ recv_callback callback, void *user_data)
+{
+ int r;
+ struct recv_info *rif;
+
+ assert(fd >= 0);
+ assert(hdr);
+ assert(callback);
+
+ pthread_mutex_lock(&recv_lock);
+ switch (hdr->type) {
+ case MSG_FIRST:
+ rif = add_rif(fd, hdr, callback, user_data);
+ if (!rif) {
+ flush_data(fd, sizeof(*hdr) + hdr->len);
+ pthread_mutex_unlock(&recv_lock);
+ return -1;
+ }
+ r = recv_data(rif, hdr->len);
+ break;
+ case MSG_MIDDLE:
+ case MSG_LAST:
+ rif = find_rif(fd, hdr->seq);
+ if (!rif) {
+ bxt_err("recv: fd %d seq %u not exist", fd, hdr->seq);
+ flush_data(fd, sizeof(*hdr) + hdr->len);
+ pthread_mutex_unlock(&recv_lock);
+ return -1;
+ }
+ r = recv_data(rif, hdr->len);
+ if (hdr->type == MSG_LAST && r == 0 && rif->recved < rif->len) {
+ bxt_err("recv: fd %d packet lost %d/%d",
+ fd, rif->recved, rif->len);
+ r = -1;
+ }
+ break;
+ default:
+ bxt_err("recv: fd %d unknown type %x", fd, hdr->type);
+ rif = NULL;
+ r = -1;
+ break;
+ }
+
+ if (r == -1) {
+ remove_rif(rif);
+ pthread_mutex_unlock(&recv_lock);
+ return -1;
+ }
+
+ if (hdr->type == MSG_LAST) {
recv_list = g_list_remove(recv_list, rif);
pthread_mutex_unlock(&recv_lock);
+
+ assert(rif->callback);
rif->callback(rif->user_data, rif->type, rif->data, rif->len);
- pthread_mutex_lock(&recv_lock);
remove_rif(rif);
+ } else {
+ pthread_mutex_unlock(&recv_lock);
}
return 0;
}
-int proto_recv_frag(int fd, recv_callback callback, void *user_data)
+int proto_recv_async(int fd, recv_callback callback, void *user_data)
{
int r;
- struct recv_info *rif;
+ struct header hdr;
if (fd < 0 || !callback) {
errno = EINVAL;
return -1;
}
- pthread_mutex_lock(&recv_lock);
- rif = find_rif(fd);
- if (!rif)
- r = recv_first(fd, callback, user_data);
- else
- r = recv_cont(rif);
- pthread_mutex_unlock(&recv_lock);
+ r = recv(fd, &hdr, sizeof(hdr), MSG_PEEK);
+ if (r <= 0) {
+ if (r == 0) {
+ bxt_dbg("recv: fd %d closed", fd);
+ } else {
+ if (errno == EAGAIN || errno == EINTR)
+ return 0;
- return r;
-}
+ bxt_err("recv: fd %d errno %d", fd, errno);
+ }
+ return -1;
+ }
+
+ if (hdr.type == MSG_SINGLE)
+ return proto_recv_single(fd, callback, user_data);
+
+ return proto_recv_frag(fd, &hdr, callback, user_data);
+}
int proto_send_block(int fd, enum message_type type, uint8_t *data, int32_t len)
{
int r;
- uint32_t hdr;
+ struct header *hdr;
int sent;
struct pollfd fds[1];
- int s;
+ uint8_t buf[MSG_MTU];
if (fd < 0 || !data || len <= 0) {
errno = EINVAL;
return -1;
}
+ if (len <= (MSG_MTU - sizeof(*hdr)))
+ return proto_send(fd, type, data, len);
+
bxt_dbg("send: fd %d type %d len %d start", fd, type, len);
- hdr = (type << 24) | (len & 0xffffff);
- r = send(fd, &hdr, sizeof(uint32_t), 0);
- if (r == -1) {
- bxt_err("send: fd %d errno %d", fd, errno);
- return -1;
- }
+ hdr = (struct header *)buf;
+ hdr->mtype = type;
+ hdr->seq = __atomic_fetch_add(&__seq, 1, __ATOMIC_RELAXED);
+ hdr->total = len;
+
+ fds[0].fd = fd;
+ fds[0].events = POLLOUT;
+ fds[0].revents = 0;
sent = 0;
while (len > sent) {
- fds[0].fd = fd;
- fds[0].events = POLLOUT;
- fds[0].revents = 0;
-
/* CAN BE BLOCKED ! */
- r = poll(fds, 1, SEND_TIMEOUT_MSEC);
+ r = poll(fds, 1, -1);
if (r == -1) {
+ if (errno == EINTR)
+ continue;
+
bxt_err("send: fd %d poll errno %d", fd, errno);
return -1;
}
- if (r == 0) {
- bxt_err("send: fd %d timeout", fd);
- return -1;
+
+ hdr->len = len - sent;
+ if (hdr->len > (MSG_MTU - sizeof(*hdr))) {
+ hdr->type = sent == 0 ? MSG_FIRST : MSG_MIDDLE;
+ hdr->len = MSG_MTU - sizeof(*hdr);
+ } else {
+ hdr->type = MSG_LAST;
}
- s = len - sent;
- if (s > SEND_TIMEOUT_MSEC)
- s = SEND_TIMEOUT_MSEC;
+ memcpy(hdr->data, &data[sent], hdr->len);
- r = send(fd, &data[sent], s, 0);
+ r = send(fd, hdr, sizeof(*hdr) + hdr->len, 0);
if (r == -1) {
bxt_err("send: fd %d errno %d", fd, errno);
return -1;
}
- sent += r;
+ sent += hdr->len;
}
bxt_dbg("send: fd %d sent %d", fd, sent);
int proto_send(int fd, enum message_type type, uint8_t *data, int32_t len)
{
int r;
- uint32_t hdr;
+ struct header *hdr;
uint8_t *buf;
+ struct pollfd fds[1];
assert(fd >= 0);
assert(data);
assert(len > 0);
- buf = malloc(len + sizeof(uint32_t));
+ bxt_dbg("send: fd %d type %d len %d start", fd, type, len);
+
+ buf = malloc(sizeof(*hdr) + len);
if (!buf) {
bxt_err("send: send buffer alloc error");
return -1;
}
- hdr = (type << 24) | (len & 0xffffff);
+ hdr = (struct header *)buf;
+ hdr->type = MSG_SINGLE;
+ hdr->mtype = type;
+ hdr->seq = __atomic_fetch_add(&__seq, 1, __ATOMIC_RELAXED);
+ hdr->total = len;
+ hdr->len = len;
- memcpy(buf, &hdr, sizeof(uint32_t));
- memcpy(buf + sizeof(uint32_t), data, len);
+ memcpy(hdr->data, data, len);
- r = send(fd, buf, len + sizeof(uint32_t), 0);
+ fds[0].fd = fd;
+ fds[0].events = POLLOUT;
+ fds[0].revents = 0;
+
+ do {
+ /* CAN BE BLOCKED ! */
+ r = poll(fds, 1, -1);
+ if (r == -1) {
+ if (errno == EINTR)
+ continue;
+
+ bxt_err("send: fd %d poll errno %d", fd, errno);
+ free(buf);
+
+ return -1;
+ }
+ } while (r <= 0);
+
+ r = send(fd, buf, sizeof(*hdr) + len, 0);
free(buf);
return -1;
}
- if (r != len + sizeof(uint32_t))
+ if (r != sizeof(*hdr) + len)
bxt_err("send: %d / %d byte", r,
- (int32_t)(len + sizeof(uint32_t)));
+ (int32_t)(sizeof(*hdr) + len));
+
+ bxt_dbg("send: fd %d sent %d", fd, r - sizeof(*hdr));
return 0;
}
int proto_recv(int fd, enum message_type *type, uint8_t **data, int32_t *len)
{
int r;
- uint32_t hdr;
+ struct header hdr;
uint8_t *_data;
- int32_t _len;
- enum message_type _type;
assert(fd >= 0);
assert(type);
assert(data);
assert(len);
- r = recv(fd, &hdr, sizeof(uint32_t), 0);
+ r = recv(fd, &hdr, sizeof(hdr), 0);
if (r <= 0) {
if (r == 0)
bxt_dbg("recv: fd %d closed", fd);
return -1;
}
- _type = hdr >> 24;
- _len = hdr & 0xffffff;
-
- if (_len == 0) {
+ if (r != sizeof(hdr) || hdr.len == 0 || hdr.type != MSG_SINGLE) {
bxt_err("recv: fd %d Invalid message", fd);
return -1;
}
- _data = malloc(_len);
+ _data = malloc(hdr.total);
if (!_data) {
- /* flush ? */
+ flush_data(fd, hdr.total);
return -1;
}
- r = recv(fd, _data, _len, 0);
+ r = recv(fd, _data, hdr.total, 0);
if (r <= 0) {
if (r == 0)
bxt_dbg("recv: fd %d closed", fd);
return -1;
}
- if (r != _len) {
+ if (r != hdr.total) {
bxt_err("recv: fd %d expect size %d > received %d",
- fd, _len, r);
+ fd, hdr.total, r);
free(_data);
return -1;
}
- *type = _type;
+ *type = hdr.mtype;
*data = _data;
- *len = _len;
+ *len = hdr.total;
return 0;
}