common: added a UDP transport to check if the abstraction is generic enough.
authorKrisztian Litkey <kli@iki.fi>
Wed, 2 May 2012 22:38:11 +0000 (01:38 +0300)
committerKrisztian Litkey <kli@iki.fi>
Thu, 3 May 2012 00:05:12 +0000 (03:05 +0300)
src/Makefile.am
src/common/tests/Makefile.am
src/common/tests/udp-test.c [new file with mode: 0644]
src/common/udp-transport.c [new file with mode: 0644]

index 022ee9f..c0d3331 100644 (file)
@@ -50,7 +50,8 @@ libmurphy_common_la_SOURCES =         \
                common/file-utils.c     \
                common/msg.c            \
                common/transport.c      \
-               common/tcp-transport.c
+               common/tcp-transport.c  \
+               common/udp-transport.c
 
 libmurphy_common_la_CFLAGS  =          \
                $(AM_CFLAGS)
index 969c063..78819b1 100644 (file)
@@ -1,6 +1,6 @@
 AM_CFLAGS = $(WARNING_CFLAGS) -I$(top_builddir)
 
-noinst_PROGRAMS = mm-test hash-test mainloop-test msg-test tcp-test
+noinst_PROGRAMS = mm-test hash-test mainloop-test msg-test tcp-test udp-test
 
 # memory management test
 mm_test_SOURCES = mm-test.c
@@ -26,3 +26,8 @@ msg_test_LDADD   = ../../libmurphy-common.la
 tcp_test_SOURCES = tcp-test.c
 tcp_test_CFLAGS  = $(AM_CFLAGS)
 tcp_test_LDADD   = ../../libmurphy-common.la
+
+# UDP transport test
+udp_test_SOURCES = udp-test.c
+udp_test_CFLAGS  = $(AM_CFLAGS)
+udp_test_LDADD   = ../../libmurphy-common.la
diff --git a/src/common/tests/udp-test.c b/src/common/tests/udp-test.c
new file mode 100644 (file)
index 0000000..eb40f94
--- /dev/null
@@ -0,0 +1,223 @@
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include <murphy/common.h>
+
+
+typedef struct {
+    mrp_mainloop_t  *ml;
+    mrp_transport_t *t;
+    char            *addr;
+    int              server;
+    int              sock;
+    mrp_io_watch_t  *iow;
+    mrp_timer_t     *timer;
+} context_t;
+
+
+void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+    context_t *c = (context_t *)user_data;
+
+    MRP_UNUSED(t);    
+    MRP_UNUSED(c);
+    
+    mrp_log_info("Received a message.");
+    mrp_msg_dump(msg, stdout);
+    
+    if (c->server) {
+       if (mrp_transport_send(t, msg))
+           mrp_log_info("Reply successfully sent.");
+       else
+           mrp_log_error("Failed to send reply.");
+    }
+
+#if 0 /* done by the tranport layer... */
+    mrp_msg_destroy(msg);
+#endif
+}
+
+
+void recvfrom_evt(mrp_transport_t *t, mrp_msg_t *msg, void *addr,
+                 socklen_t addrlen, void *user_data)
+{
+    context_t *c = (context_t *)user_data;
+
+    MRP_UNUSED(t);    
+    MRP_UNUSED(c);
+    
+    mrp_log_info("Received a message.");
+    mrp_msg_dump(msg, stdout);
+    
+    if (c->server) {
+       mrp_msg_append(msg, "type", "reply", strlen("reply")+1);
+       if (mrp_transport_sendto(t, msg, addr, addrlen))
+           mrp_log_info("Reply successfully sent.");
+       else
+           mrp_log_error("Failed to send reply.");
+    }
+
+#if 0 /* done by the tranport layer... */
+    mrp_msg_destroy(msg);
+#endif
+}
+
+
+void closed_evt(mrp_transport_t *t, int error, void *user_data)
+{
+    context_t *c = (context_t *)user_data;
+
+    MRP_UNUSED(t);
+    MRP_UNUSED(c);
+
+    if (error) {
+       mrp_log_error("Connection closed with error %d (%s).", error,
+                     strerror(error));
+       exit(1);
+    }
+    else {
+       mrp_log_info("Peer has closed the connection.");
+       exit(0);
+    }
+}
+
+
+void server_init(context_t *c)
+{
+    static mrp_transport_evt_t evt = {
+       .closed   = closed_evt,
+       .recv     = NULL,
+       .recvfrom = recvfrom_evt,
+    };
+    struct sockaddr addr;
+    socklen_t       addrlen;
+
+    c->t = mrp_transport_create(c->ml, "udp", &evt, c);
+    
+    if (c->t == NULL) {
+       mrp_log_error("Failed to create new transport.");
+       exit(1);
+    }
+
+    addrlen = mrp_transport_resolve(c->t, c->addr, &addr, sizeof(addr));
+    
+    if (!addrlen) {
+       mrp_log_error("Failed to resolve address '%s'.", c->addr);
+       exit(1);
+    }
+
+    if (!mrp_transport_bind(c->t, &addr, addrlen)) {
+       mrp_log_error("Failed to bind to %s.", c->addr);
+       exit(1);
+    }
+
+    mrp_log_info("Waiting for messages on %s...", c->addr);
+}
+
+
+void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
+{
+    static int seqno = 1;
+
+    context_t *c = (context_t *)user_data;
+    mrp_msg_t *msg;
+    char       seq[32];
+    int        len;
+
+    MRP_UNUSED(ml);
+    MRP_UNUSED(t);
+
+    if ((msg = mrp_msg_create(NULL)) == NULL) {
+       mrp_log_error("Failed to create new message.");
+       exit(1);
+    }
+
+    len = snprintf(seq, sizeof(seq), "%d", seqno);
+    
+    if (!mrp_msg_append(msg, "seq", seq, len + 1) ||
+       !mrp_msg_append(msg, "foo", "bar", sizeof("bar")) ||
+       !mrp_msg_append(msg, "bar", "foo", sizeof("foo")) ||
+       !mrp_msg_append(msg, "foobar", "barfoo", sizeof("barfoo")) ||
+       !mrp_msg_append(msg, "barfoo", "foobar", sizeof("foobar"))) {
+       mrp_log_error("Failed to construct message #%d.", seqno);
+       exit(1);
+    }
+
+    if (!mrp_transport_send(c->t, msg)) {
+       mrp_log_error("Failed to send message #%d.", seqno);
+       exit(1);
+    }
+    else {
+       mrp_log_info("Message #%d succesfully sent.", seqno++);
+    }
+}
+
+
+void client_init(context_t *c)
+{
+    static mrp_transport_evt_t evt = {
+       .closed   = closed_evt,
+       .recv     = recv_evt,
+       .recvfrom = NULL,
+    };
+
+    c->t = mrp_transport_create(c->ml, "udp", &evt, c);
+    
+    if (c->t == NULL) {
+       mrp_log_error("Failed to create new transport.");
+       exit(1);
+    }
+
+    if (!mrp_transport_connect(c->t, c->addr)) {
+       mrp_log_error("Failed to connect to %s.", c->addr);
+       exit(1);
+    }
+
+    c->timer = mrp_add_timer(c->ml, 1000, send_cb, c);
+
+    if (c->timer == NULL) {
+       mrp_log_error("Failed to create send timer.");
+       exit(1);
+    }
+}
+
+
+int main(int argc, char *argv[])
+{
+    context_t c;
+
+    mrp_clear(&c);
+    mrp_log_set_mask(MRP_LOG_UPTO(MRP_LOG_DEBUG));
+    mrp_log_set_target(MRP_LOG_TO_STDOUT);
+
+    if (argc == 3 && (!strcmp(argv[1], "-s") || !strcmp(argv[1], "--server"))) {
+       c.server = TRUE;
+       c.addr  = argv[2];
+       mrp_log_info("Running as server, using address '%s'...", c.addr);
+    }
+    else if (argc == 2) {
+       c.addr = argv[1];
+       mrp_log_info("Running as client, using address '%s'...", c.addr);
+    }
+    else {
+       mrp_log_error("invalid command line arguments");
+       mrp_log_error("usage: %s [-s] address:port", argv[0]);
+       exit(1);
+    }
+
+    c.ml = mrp_mainloop_create();
+
+    if (c.server)
+       server_init(&c);
+    else
+       client_init(&c);
+    
+    mrp_mainloop_run(c.ml);
+
+    return 0;
+}
diff --git a/src/common/udp-transport.c b/src/common/udp-transport.c
new file mode 100644 (file)
index 0000000..d2dd7ca
--- /dev/null
@@ -0,0 +1,381 @@
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+
+#include <murphy/common/macros.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+#include <murphy/common/msg.h>
+#include <murphy/common/transport.h>
+
+#define DEFAULT_SIZE 1024                   /* default input buffer size */
+
+typedef struct {
+    MRP_TRANSPORT_PUBLIC_FIELDS;         /* common transport fields */
+    int             sock;                /* UDP socket */
+    int             family;              /* socket family */
+    mrp_io_watch_t *iow;                 /* socket I/O watch */
+    void           *ibuf;                /* input buffer */
+    size_t          isize;               /* input buffer size */
+    size_t          idata;               /* amount of input data */
+} udp_t;
+
+
+static void udp_recv_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd,
+                       mrp_io_event_t events, void *user_data);
+static int udp_disconnect(mrp_transport_t *mu);
+static int open_socket(udp_t *u, int family);
+
+
+static socklen_t udp_resolve(char *str, void *addr, socklen_t size)
+{
+    struct addrinfo *ai, hints;
+    char             node[512], *port;
+    
+    mrp_clear(&hints);    
+    hints.ai_family = AF_UNSPEC;
+    ai              = NULL;
+
+    if      (!strncmp(str, "udp:" , 4)) str += 4;
+    else if (!strncmp(str, "udp4:", 5)) str += 5, hints.ai_family = AF_INET;
+    else if (!strncmp(str, "udp6:", 5)) str += 5, hints.ai_family = AF_INET6;
+    
+    strncpy(node, str, sizeof(node) - 1);
+    node[sizeof(node) - 1] = '\0';
+    if ((port = strrchr(node, ':')) == NULL)
+       return FALSE;
+    *port++ = '\0';
+
+    if (getaddrinfo(node, port, &hints, &ai) == 0) {
+       if (size >= ai->ai_addrlen) {
+           memcpy(addr, ai->ai_addr, ai->ai_addrlen);
+           size = ai->ai_addrlen;
+       }
+       else
+           size = 0;
+       freeaddrinfo(ai);
+
+       return size;
+    }
+    else
+       return 0;
+}
+
+
+static int udp_open(mrp_transport_t *mu)
+{
+    udp_t *u = (udp_t *)mu;
+    
+    u->sock   = -1;
+    u->family = -1;
+
+    return TRUE;
+}
+
+
+static int udp_bind(mrp_transport_t *mu, void *addr, socklen_t addrlen)
+{
+    udp_t *u = (udp_t *)mu;
+    
+    if (u->sock != -1 || !u->connected) {
+       if (open_socket(u, ((struct sockaddr *)addr)->sa_family))
+           if (bind(u->sock, (struct sockaddr *)addr, addrlen) == 0)
+               return TRUE;
+    }
+
+    return FALSE;
+}
+
+
+static void udp_close(mrp_transport_t *mu)
+{
+    udp_t *u = (udp_t *)mu;
+
+    mrp_del_io_watch(u->iow);
+    u->iow = NULL;
+
+    mrp_free(u->ibuf);
+    u->ibuf  = NULL;
+    u->isize = 0;
+    u->idata = 0;
+    
+    if (u->sock >= 0){
+       close(u->sock);
+       u->sock = -1;
+    }
+}
+
+
+static void udp_recv_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd,
+                        mrp_io_event_t events, void *user_data)
+{
+    udp_t           *u  = (udp_t *)user_data;
+    mrp_transport_t *mu = (mrp_transport_t *)u;
+    struct sockaddr  addr;
+    socklen_t        addrlen;
+    uint32_t         size;
+    ssize_t          n;
+    void            *data;
+    int              old, error;
+    mrp_msg_t       *msg;
+
+    MRP_UNUSED(ml);
+    MRP_UNUSED(w);
+
+    if (events & MRP_IO_EVENT_IN) {
+       if (u->idata == u->isize) {
+           if (u->isize != 0) {
+               old      = u->isize;
+               u->isize *= 2;
+           }
+           else {
+               old      = 0;
+               u->isize = DEFAULT_SIZE;
+           }
+           if (!mrp_reallocz(u->ibuf, old, u->isize)) {
+               error = ENOMEM;
+           fatal_error:
+           closed:
+               udp_disconnect(mu);
+               
+               if (u->evt.closed != NULL)
+                   MRP_TRANSPORT_BUSY(mu, {
+                           mu->evt.closed(mu, error, mu->user_data);
+                       });
+               
+               u->check_destroy(mu);
+               return;
+           }
+       }
+
+       if (recv(fd, &size, sizeof(size), MSG_PEEK) != sizeof(size)) {
+           error = EIO;
+           goto fatal_error;
+       }
+       
+       size = ntohl(size);
+
+       if (u->isize < size + sizeof(size)) {
+           old      = u->isize;
+           u->isize = size + sizeof(size);
+           
+           if (!mrp_reallocz(u->ibuf, old, u->isize)) {
+               error = ENOMEM;
+               goto fatal_error;
+           }
+       }
+
+       addrlen = sizeof(addr);
+       n = recvfrom(fd, u->ibuf, size + sizeof(size), 0, &addr, &addrlen);
+       
+       if (n != (ssize_t)(size + sizeof(size))) {
+           error = n < 0 ? EIO : EPROTO;
+           goto fatal_error;
+       }
+       
+       data = u->ibuf + sizeof(size);
+       msg  = mrp_msg_default_decode(data, size);
+       
+       if (msg != NULL) {
+           if (mu->connected) {
+               MRP_TRANSPORT_BUSY(mu, {
+                       mu->evt.recv(mu, msg, mu->user_data);
+                   });
+           }
+           else {
+               MRP_TRANSPORT_BUSY(mu, {
+                       mu->evt.recvfrom(mu, msg, &addr, addrlen,
+                                        mu->user_data);
+                   });
+           }
+
+           mrp_msg_unref(msg);
+
+           if (u->check_destroy(mu))
+               return;
+       }
+       else {
+           error = EPROTO;
+           goto fatal_error;
+       }
+    }
+
+    if (events & MRP_IO_EVENT_HUP) {
+       error = 0;
+       goto closed;
+    }
+}
+
+
+static int open_socket(udp_t *u, int family)
+{
+    mrp_io_event_t events;
+
+    u->sock = socket(family, SOCK_DGRAM, 0);
+    
+    if (u->sock != -1) {
+       events = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP;
+       u->iow = mrp_add_io_watch(u->ml, u->sock, events, udp_recv_cb, u);
+    
+       if (u->iow != NULL)
+           return TRUE;
+       else {
+           close(u->sock);
+           u->sock = -1;
+       }
+    }
+
+    return FALSE;
+}
+
+
+static int udp_connect(mrp_transport_t *mu, void *addrstr)
+{
+    udp_t           *u = (udp_t *)mu;
+    struct sockaddr  addr;
+    int              addrlen;
+    int              reuse;
+    long             nonblk;
+
+    addrlen = mrp_transport_resolve(mu, addrstr, &addr, sizeof(addr));
+    
+    if (addrlen > 0) {
+       if (MRP_UNLIKELY(u->family != -1 && u->family != addr.sa_family))
+           return FALSE;
+
+       if (MRP_UNLIKELY(u->sock == -1)) {
+           if (!open_socket(u, addr.sa_family))
+               return FALSE;
+       }
+
+       if (connect(u->sock, &addr, addrlen) == 0) {
+           reuse = 1;
+           setsockopt(u->sock, SOL_SOCKET, SO_REUSEADDR,
+                      &reuse, sizeof(reuse));
+           nonblk = 1;
+           fcntl(u->sock, F_SETFL, O_NONBLOCK, nonblk);
+
+           u->connected = TRUE;
+
+           return TRUE;
+       }
+    }
+    
+    return FALSE;
+}
+
+
+static int udp_disconnect(mrp_transport_t *mu)
+{
+    udp_t *u = (udp_t *)mu;
+
+    if (u->connected) {
+       mrp_del_io_watch(u->iow);
+       u->iow = NULL;
+
+       shutdown(u->sock, SHUT_RDWR);
+       u->connected = FALSE;
+
+       return TRUE;
+    }
+    else
+       return FALSE;
+}
+
+
+static int udp_send(mrp_transport_t *mu, mrp_msg_t *msg)
+{
+    udp_t        *u = (udp_t *)mu;
+    struct iovec  iov[2];
+    void         *buf;
+    ssize_t       size, n;
+    uint32_t      len;
+
+    if (u->connected) {
+       size = mrp_msg_default_encode(msg, &buf);
+    
+       if (size >= 0) {
+           len = htonl(size);
+           iov[0].iov_base = &len;
+           iov[0].iov_len  = sizeof(len);
+           iov[1].iov_base = buf;
+           iov[1].iov_len  = size;
+       
+           n = writev(u->sock, iov, 2);
+           mrp_free(buf);
+
+           if (n == (ssize_t)(size + sizeof(len)))
+               return TRUE;
+           else {
+               if (n == -1 && errno == EAGAIN) {
+                   mrp_log_error("%s(): XXX TODO: this sucks, need to add "
+                                 "output queuing for udp-transport.",
+                                 __FUNCTION__);
+               }
+           }
+       }
+    }
+
+    return FALSE;
+}
+
+
+static int udp_sendto(mrp_transport_t *mu, mrp_msg_t *msg, void *addr,
+                     socklen_t addrlen)
+{
+    udp_t           *u = (udp_t *)mu;
+    struct iovec     iov[2];
+    void            *buf;
+    ssize_t          size, n;
+    uint32_t         len;
+    struct msghdr    hdr;
+
+    if (MRP_UNLIKELY(u->sock == -1)) {
+       if (!open_socket(u, ((struct sockaddr *)addr)->sa_family))
+           return FALSE;
+    }
+       
+    size = mrp_msg_default_encode(msg, &buf);
+    
+    if (size >= 0) {
+       len = htonl(size);
+       iov[0].iov_base = &len;
+       iov[0].iov_len  = sizeof(len);
+       iov[1].iov_base = buf;
+       iov[1].iov_len  = size;
+       
+       hdr.msg_name    = addr;
+       hdr.msg_namelen = addrlen;
+       hdr.msg_iov     = iov;
+       hdr.msg_iovlen  = MRP_ARRAY_SIZE(iov);
+       
+       hdr.msg_control    = NULL;
+       hdr.msg_controllen = 0;
+       hdr.msg_flags      = 0;
+           
+       n = sendmsg(u->sock, &hdr, 0);
+       mrp_free(buf);
+       
+       if (n == (ssize_t)(size + sizeof(len)))
+           return TRUE;
+       else {
+           if (n == -1 && errno == EAGAIN) {
+               mrp_log_error("%s(): XXX TODO: udp-transport send failed",
+                             __FUNCTION__);
+           }
+       }
+    }
+    
+    return FALSE;
+}
+
+
+MRP_REGISTER_TRANSPORT("udp", udp_t, udp_resolve,
+                      udp_open, udp_bind, NULL, udp_close,
+                      udp_connect, udp_disconnect,
+                      udp_send, udp_sendto);