common: add message header to enhance exception handling 60/44860/1
authorSuchang Woo <suchang.woo@samsung.com>
Mon, 27 Jul 2015 11:43:25 +0000 (20:43 +0900)
committerSuchang Woo <suchang.woo@samsung.com>
Tue, 28 Jul 2015 12:49:27 +0000 (21:49 +0900)
In the previous implementation, fragmented message had no header. If
a message was sent by thread during sending fragmented messages, a
receiver cannot distinguish a single message from fragmented messages.
Adding header to every message, a receiver can distinguish
messages.

Change-Id: I14d0f8ff6b0c0f9d3cf55ac9046da1d1ad8b654c
Signed-off-by: Suchang Woo <suchang.woo@samsung.com>
common/proto.c
common/proto.h
lib/buxton2.c

index 1f7a09e..ae9a06a 100644 (file)
 #include "log.h"
 #include "proto.h"
 
-#define SEND_TIMEOUT_MSEC 1000
-#define SEND_PACKET_MAX 8192
-
 struct recv_info {
        int fd;
+       uint16_t seq;
 
        recv_callback callback;
        void *user_data;
 
        enum message_type type;
-       uint8_t *data;
        int32_t len;
 
        int32_t recved;
+       uint8_t *data;
+       uint8_t _data[0];
 };
 
 static GList *recv_list;
 static pthread_mutex_t recv_lock = PTHREAD_MUTEX_INITIALIZER;
+static uint16_t __seq;
 
-static struct recv_info *find_rif(int fd)
+static struct recv_info *find_rif(int fd, uint16_t seq)
 {
        GList *l;
+       struct recv_info *rif;
 
        for (l = recv_list; l; l = g_list_next(l)) {
-               if (((struct recv_info *)l->data)->fd == fd)
-                       return l->data;
+               rif = l->data;
+               if (rif->fd == fd && rif->seq == seq)
+                       return rif;
        }
 
        return NULL;
 }
 
-static int recv_first(int fd, recv_callback callback, void *user_data)
+static void remove_rif(struct recv_info *rif)
+{
+       GList *l;
+
+       if (!rif)
+               return;
+
+       l = g_list_find(recv_list, rif);
+       if (l)
+               recv_list = g_list_delete_link(recv_list, l);
+
+       bxt_dbg("rif %p seq %u type %d len %d/%d freed",
+                       rif, rif->seq, rif->type, rif->recved, rif->len);
+       free(rif);
+}
+
+static struct recv_info *add_rif(int fd, struct header *hdr,
+               recv_callback callback, void *user_data)
 {
-       int r;
        struct recv_info *rif;
-       uint32_t hdr;
+       struct recv_info *f;
 
-       rif = calloc(1, sizeof(*rif));
+       assert(callback);
+       assert(hdr);
+
+       /* 8 is pad for alignment */
+       rif = malloc(sizeof(*rif) + hdr->total + 8);
        if (!rif)
-               return -1;
+               return NULL;
+
+       memset(rif, 0, sizeof(*rif));
 
        rif->fd = fd;
+       rif->seq = hdr->seq;
+
        rif->callback = callback;
        rif->user_data = user_data;
 
-       r = recv(fd, &hdr, sizeof(uint32_t), 0);
-       if (r <= 0) {
-               free(rif);
-               if (r == 0) {
-                       bxt_dbg("recv: fd %d closed", fd);
-               } else {
-                       if (errno == EAGAIN)
-                               return 0;
+       rif->type = hdr->mtype;
+       rif->len = hdr->total;
 
-                       bxt_err("recv: fd %d errno %d", fd, errno);
-               }
+       /* 8 byte alignment */
+       rif->data = (uint8_t *)(((uintptr_t)rif->_data + 0x7) & ~0x7);
 
-               return -1;
+       f = find_rif(rif->fd, rif->seq);
+       if (f) {
+               bxt_err("rif %p seq %u already exists in list", f, f->seq);
+               remove_rif(f);
        }
 
-       rif->type = hdr >> 24;
-       rif->len = hdr & 0xffffff;
-       if (rif->len == 0) {
-               free(rif);
-               bxt_err("recv: fd %d invalid header %x", fd, hdr);
-               return -1;
-       }
+       recv_list = g_list_append(recv_list, rif);
 
-       rif->data = malloc(rif->len);
-       if (!rif->data) {
-               free(rif);
-               return -1;
-       }
+       bxt_dbg("rif %p seq %u type %d len %d added",
+                       rif, rif->seq, rif->type, rif->len);
 
-       recv_list = g_list_append(recv_list, rif);
+       return rif;
+}
 
-       bxt_dbg("rif %p type %d len %d added", rif, rif->type, rif->len);
+static void flush_data(int fd, uint32_t len)
+{
+       int r;
+       uint32_t s;
+       char buf[4096];
 
-       return 0;
+       while (len > 0) {
+               s = len > sizeof(buf) ? sizeof(buf) : len;
+
+               r = recv(fd, buf, s, 0);
+               if (r == -1)
+                       break;
 
+               len -= r;
+       }
 }
 
-static void remove_rif(struct recv_info *rif)
+static int proto_recv_single(int fd, recv_callback callback, void *user_data)
 {
-       if (!rif)
-               return;
+       int r;
+       enum message_type type;
+       uint8_t *data;
+       int32_t len;
 
-       recv_list = g_list_remove(recv_list, rif);
-       free(rif->data);
-       free(rif);
+       assert(fd >= 0);
+       assert(callback);
+
+       r = proto_recv(fd, &type, &data, &len);
+       if (r == -1)
+               return -1;
+
+       callback(user_data, type, data, len);
+       free(data);
+
+       return 0;
 }
 
-static int recv_cont(struct recv_info *rif)
+static int recv_data(struct recv_info *rif, uint32_t len)
 {
        int r;
+       struct header *hdr;
+       uint8_t buf[MSG_MTU];
 
        assert(rif);
 
-       r = recv(rif->fd, &rif->data[rif->recved], rif->len - rif->recved, 0);
+       r = recv(rif->fd, buf, sizeof(*hdr) + len, 0);
        if (r <= 0) {
                if (r == 0) {
                        bxt_dbg("recv: fd %d closed", rif->fd);
@@ -138,104 +176,172 @@ static int recv_cont(struct recv_info *rif)
                        bxt_err("recv: fd %d errno %d", rif->fd, errno);
                }
 
-               remove_rif(rif);
                return -1;
        }
-       rif->recved += r;
 
-       if (rif->recved > rif->len) {
-               bxt_err("recv: fd %d expected %d > received %d", rif->fd,
-                               rif->len, rif->recved);
-               remove_rif(rif);
+       if (r < sizeof(*hdr) + len) {
+               bxt_err("recv: fd %d expect %d > received %d",
+                               rif->fd, sizeof(*hdr) + len, r);
                return -1;
        }
 
-       if (rif->recved == rif->len) {
-               bxt_dbg("rif %p received %d / %d", rif, rif->recved, rif->len);
+       hdr = (struct header *)buf;
+       memcpy(&rif->data[rif->recved], hdr->data, len);
+       rif->recved += len;
 
-               assert(rif->callback);
+       return 0;
+}
+
+static int proto_recv_frag(int fd, struct header *hdr,
+               recv_callback callback, void *user_data)
+{
+       int r;
+       struct recv_info *rif;
+
+       assert(fd >= 0);
+       assert(hdr);
+       assert(callback);
+
+       pthread_mutex_lock(&recv_lock);
+       switch (hdr->type) {
+       case MSG_FIRST:
+               rif = add_rif(fd, hdr, callback, user_data);
+               if (!rif) {
+                       flush_data(fd, sizeof(*hdr) + hdr->len);
+                       pthread_mutex_unlock(&recv_lock);
+                       return -1;
+               }
+               r = recv_data(rif, hdr->len);
+               break;
+       case MSG_MIDDLE:
+       case MSG_LAST:
+               rif = find_rif(fd, hdr->seq);
+               if (!rif) {
+                       bxt_err("recv: fd %d seq %u not exist", fd, hdr->seq);
+                       flush_data(fd, sizeof(*hdr) + hdr->len);
+                       pthread_mutex_unlock(&recv_lock);
+                       return -1;
+               }
+               r = recv_data(rif, hdr->len);
+               if (hdr->type == MSG_LAST && r == 0 && rif->recved < rif->len) {
+                       bxt_err("recv: fd %d packet lost %d/%d",
+                                       fd, rif->recved, rif->len);
+                       r = -1;
+               }
+               break;
+       default:
+               bxt_err("recv: fd %d unknown type %x", fd, hdr->type);
+               rif = NULL;
+               r = -1;
+               break;
+       }
+
+       if (r == -1) {
+               remove_rif(rif);
+               pthread_mutex_unlock(&recv_lock);
+               return -1;
+       }
+
+       if (hdr->type == MSG_LAST) {
                recv_list = g_list_remove(recv_list, rif);
                pthread_mutex_unlock(&recv_lock);
+
+               assert(rif->callback);
                rif->callback(rif->user_data, rif->type, rif->data, rif->len);
-               pthread_mutex_lock(&recv_lock);
                remove_rif(rif);
+       } else {
+               pthread_mutex_unlock(&recv_lock);
        }
 
        return 0;
 }
 
-int proto_recv_frag(int fd, recv_callback callback, void *user_data)
+int proto_recv_async(int fd, recv_callback callback, void *user_data)
 {
        int r;
-       struct recv_info *rif;
+       struct header hdr;
 
        if (fd < 0 || !callback) {
                errno = EINVAL;
                return -1;
        }
 
-       pthread_mutex_lock(&recv_lock);
-       rif = find_rif(fd);
-       if (!rif)
-               r = recv_first(fd, callback, user_data);
-       else
-               r = recv_cont(rif);
-       pthread_mutex_unlock(&recv_lock);
+       r = recv(fd, &hdr, sizeof(hdr), MSG_PEEK);
+       if (r <= 0) {
+               if (r == 0) {
+                       bxt_dbg("recv: fd %d closed", fd);
+               } else {
+                       if (errno == EAGAIN || errno == EINTR)
+                               return 0;
 
-       return r;
-}
+                       bxt_err("recv: fd %d errno %d", fd, errno);
+               }
 
+               return -1;
+       }
+
+       if (hdr.type == MSG_SINGLE)
+               return proto_recv_single(fd, callback, user_data);
+
+       return proto_recv_frag(fd, &hdr, callback, user_data);
+}
 
 int proto_send_block(int fd, enum message_type type, uint8_t *data, int32_t len)
 {
        int r;
-       uint32_t hdr;
+       struct header *hdr;
        int sent;
        struct pollfd fds[1];
-       int s;
+       uint8_t buf[MSG_MTU];
 
        if (fd < 0 || !data || len <= 0) {
                errno = EINVAL;
                return -1;
        }
 
+       if (len <= (MSG_MTU - sizeof(*hdr)))
+               return proto_send(fd, type, data, len);
+
        bxt_dbg("send: fd %d type %d len %d start", fd, type, len);
-       hdr = (type << 24) | (len & 0xffffff);
 
-       r = send(fd, &hdr, sizeof(uint32_t), 0);
-       if (r == -1) {
-               bxt_err("send: fd %d errno %d", fd, errno);
-               return -1;
-       }
+       hdr = (struct header *)buf;
+       hdr->mtype = type;
+       hdr->seq = __atomic_fetch_add(&__seq, 1, __ATOMIC_RELAXED);
+       hdr->total = len;
+
+       fds[0].fd = fd;
+       fds[0].events = POLLOUT;
+       fds[0].revents = 0;
 
        sent = 0;
        while (len > sent) {
-               fds[0].fd = fd;
-               fds[0].events = POLLOUT;
-               fds[0].revents = 0;
-
                /* CAN BE BLOCKED ! */
-               r = poll(fds, 1, SEND_TIMEOUT_MSEC);
+               r = poll(fds, 1, -1);
                if (r == -1) {
+                       if (errno == EINTR)
+                               continue;
+
                        bxt_err("send: fd %d poll errno %d", fd, errno);
                        return -1;
                }
-               if (r == 0) {
-                       bxt_err("send: fd %d timeout", fd);
-                       return -1;
+
+               hdr->len = len - sent;
+               if (hdr->len > (MSG_MTU - sizeof(*hdr))) {
+                       hdr->type = sent == 0 ? MSG_FIRST : MSG_MIDDLE;
+                       hdr->len = MSG_MTU - sizeof(*hdr);
+               } else {
+                       hdr->type = MSG_LAST;
                }
 
-               s = len - sent;
-               if (s > SEND_TIMEOUT_MSEC)
-                       s = SEND_TIMEOUT_MSEC;
+               memcpy(hdr->data, &data[sent], hdr->len);
 
-               r = send(fd, &data[sent], s, 0);
+               r = send(fd, hdr, sizeof(*hdr) + hdr->len, 0);
                if (r == -1) {
                        bxt_err("send: fd %d errno %d", fd, errno);
                        return -1;
                }
 
-               sent += r;
+               sent += hdr->len;
        }
        bxt_dbg("send: fd %d sent %d", fd, sent);
 
@@ -245,25 +351,50 @@ int proto_send_block(int fd, enum message_type type, uint8_t *data, int32_t len)
 int proto_send(int fd, enum message_type type, uint8_t *data, int32_t len)
 {
        int r;
-       uint32_t hdr;
+       struct header *hdr;
        uint8_t *buf;
+       struct pollfd fds[1];
 
        assert(fd >= 0);
        assert(data);
        assert(len > 0);
 
-       buf = malloc(len + sizeof(uint32_t));
+       bxt_dbg("send: fd %d type %d len %d start", fd, type, len);
+
+       buf = malloc(sizeof(*hdr) + len);
        if (!buf) {
                bxt_err("send: send buffer alloc error");
                return -1;
        }
 
-       hdr = (type << 24) | (len & 0xffffff);
+       hdr = (struct header *)buf;
+       hdr->type = MSG_SINGLE;
+       hdr->mtype = type;
+       hdr->seq = __atomic_fetch_add(&__seq, 1, __ATOMIC_RELAXED);
+       hdr->total = len;
+       hdr->len = len;
 
-       memcpy(buf, &hdr, sizeof(uint32_t));
-       memcpy(buf + sizeof(uint32_t), data, len);
+       memcpy(hdr->data, data, len);
 
-       r = send(fd, buf, len + sizeof(uint32_t), 0);
+       fds[0].fd = fd;
+       fds[0].events = POLLOUT;
+       fds[0].revents = 0;
+
+       do {
+               /* CAN BE BLOCKED ! */
+               r = poll(fds, 1, -1);
+               if (r == -1) {
+                       if (errno == EINTR)
+                               continue;
+
+                       bxt_err("send: fd %d poll errno %d", fd, errno);
+                       free(buf);
+
+                       return -1;
+               }
+       } while (r <= 0);
+
+       r = send(fd, buf, sizeof(*hdr) + len, 0);
 
        free(buf);
 
@@ -272,9 +403,11 @@ int proto_send(int fd, enum message_type type, uint8_t *data, int32_t len)
                return -1;
        }
 
-       if (r != len + sizeof(uint32_t))
+       if (r != sizeof(*hdr) + len)
                bxt_err("send: %d / %d byte", r,
-                               (int32_t)(len + sizeof(uint32_t)));
+                               (int32_t)(sizeof(*hdr) + len));
+
+       bxt_dbg("send: fd %d sent %d", fd, r - sizeof(*hdr));
 
        return 0;
 }
@@ -282,17 +415,15 @@ int proto_send(int fd, enum message_type type, uint8_t *data, int32_t len)
 int proto_recv(int fd, enum message_type *type, uint8_t **data, int32_t *len)
 {
        int r;
-       uint32_t hdr;
+       struct header hdr;
        uint8_t *_data;
-       int32_t _len;
-       enum message_type _type;
 
        assert(fd >= 0);
        assert(type);
        assert(data);
        assert(len);
 
-       r = recv(fd, &hdr, sizeof(uint32_t), 0);
+       r = recv(fd, &hdr, sizeof(hdr), 0);
        if (r <= 0) {
                if (r == 0)
                        bxt_dbg("recv: fd %d closed", fd);
@@ -302,21 +433,18 @@ int proto_recv(int fd, enum message_type *type, uint8_t **data, int32_t *len)
                return -1;
        }
 
-       _type = hdr >> 24;
-       _len = hdr & 0xffffff;
-
-       if (_len == 0) {
+       if (r != sizeof(hdr) || hdr.len == 0 || hdr.type != MSG_SINGLE) {
                bxt_err("recv: fd %d Invalid message", fd);
                return -1;
        }
 
-       _data = malloc(_len);
+       _data = malloc(hdr.total);
        if (!_data) {
-               /* flush ? */
+               flush_data(fd, hdr.total);
                return -1;
        }
 
-       r = recv(fd, _data, _len, 0);
+       r = recv(fd, _data, hdr.total, 0);
        if (r <= 0) {
                if (r == 0)
                        bxt_dbg("recv: fd %d closed", fd);
@@ -328,16 +456,16 @@ int proto_recv(int fd, enum message_type *type, uint8_t **data, int32_t *len)
                return -1;
        }
 
-       if (r != _len) {
+       if (r != hdr.total) {
                bxt_err("recv: fd %d expect size %d > received %d",
-                               fd, _len, r);
+                               fd, hdr.total, r);
                free(_data);
                return -1;
        }
 
-       *type = _type;
+       *type = hdr.mtype;
        *data = _data;
-       *len = _len;
+       *len = hdr.total;
 
        return 0;
 }
index 39e923a..4074e2b 100644 (file)
 
 #include "common.h"
 
+#define MSG_FIRST  0x4
+#define MSG_MIDDLE 0x2
+#define MSG_LAST   0x1
+#define MSG_SINGLE (MSG_FIRST | MSG_MIDDLE | MSG_LAST)
+
+#define MSG_MTU    4096
+
+/*
+ * Message header (12 bytes) :
+ *
+ *  +-----------+-----------+-----------+-----------+
+ *  |  type (1) | mtype (1) |  Sequence number (2)  |
+ *  +-----------+-----------+-----------+-----------+
+ *  |               Total length (4)                |
+ *  +-----------+-----------+-----------+-----------+
+ *  |            Current data length (4)            |
+ *  +-----------+-----------+-----------+-----------+
+ *  |                  Data ... (n)                 |
+ *  +-----------+-----------+-----------+-----------+
+ *
+ */
+
+struct header {
+       uint8_t type; /* one of MSG_XXX */
+       uint8_t mtype; /* enum message_type */
+       uint16_t seq; /* sequence number */
+       uint32_t total; /* total length of message */
+       uint32_t len; /* length of current data */
+       uint8_t data[0];
+} __attribute__((packed));
+
 int proto_send(int fd, enum message_type type, uint8_t *data, int32_t len);
 int proto_recv(int fd, enum message_type *type, uint8_t **data, int32_t *len);
 
 typedef void (*recv_callback)(void *user_data,
                enum message_type type, uint8_t *data, int32_t len);
-int proto_recv_frag(int fd, recv_callback callback, void *user_data);
+int proto_recv_async(int fd, recv_callback callback, void *user_data);
 int proto_send_block(int fd, enum message_type type, uint8_t *data,
                int32_t len);
 
index 173e486..a597ddf 100644 (file)
@@ -380,7 +380,7 @@ static struct bxt_req *create_req(const struct buxton_layer *layer,
        req->callback = callback;
        req->list_cb = list_cb;
        req->data = data;
-       req->msgid = ++client_msgid;
+       req->msgid = __atomic_fetch_add(&client_msgid, 1, __ATOMIC_RELAXED);
 
        return req;
 }
@@ -670,7 +670,7 @@ static int proc_msg(struct buxton_client *client)
 {
        int r;
 
-       r = proto_recv_frag(client->fd, proc_msg_cb, client);
+       r = proto_recv_async(client->fd, proc_msg_cb, client);
        if (r == -1) {
                bxt_err("recv msg: fd %d errno %d", client->fd, errno);
                return -1;