common: initial support for internal transport.
authorIsmo Puustinen <ismo.puustinen@intel.com>
Thu, 12 Jul 2012 10:54:27 +0000 (13:54 +0300)
committerKrisztian Litkey <krisztian.litkey@intel.com>
Fri, 26 Oct 2012 16:03:49 +0000 (19:03 +0300)
src/Makefile.am
src/common/internal-transport.c [new file with mode: 0644]
src/common/tests/Makefile.am
src/common/tests/internal-transport-test.c [new file with mode: 0644]

index aefc8d1..507b043 100644 (file)
@@ -59,6 +59,7 @@ libmurphy_common_la_REGULAR_SOURCES =         \
                common/msg.c                    \
                common/transport.c              \
                common/stream-transport.c       \
+               common/internal-transport.c     \
                common/dgram-transport.c
 
 libmurphy_common_la_SOURCES =                          \
diff --git a/src/common/internal-transport.c b/src/common/internal-transport.c
new file mode 100644 (file)
index 0000000..b9e5dde
--- /dev/null
@@ -0,0 +1,593 @@
+/*
+ * Copyright (c) 2012, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *  * Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ *  * Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *  * Neither the name of Intel Corporation nor the names of its contributors
+ *    may be used to endorse or promote products derived from this software
+ *    without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+
+#include <murphy/common.h>
+
+#define INTERNAL "internal"
+
+/* storage for the global data, TODO: refactor away */
+static mrp_htbl_t *servers = NULL;
+static mrp_htbl_t *connections = NULL;
+static mrp_list_hook_t msg_queue;
+static mrp_deferred_t *d;
+static uint32_t cid;
+
+typedef struct internal_s internal_t;
+
+struct internal_s {
+    MRP_TRANSPORT_PUBLIC_FIELDS; /* common transport fields */
+    char name[MRP_SOCKADDR_SIZE]; /* bound connection name */
+    mrp_sockaddr_t address; /* internal connection name*/
+    bool active;
+    bool bound;
+    bool listening;
+
+    internal_t *endpoint; /* that we are connected to */
+};
+
+typedef struct {
+    void *data;
+    size_t size;
+    internal_t *u;
+    mrp_sockaddr_t *addr;
+    socklen_t addrlen;
+    bool free_data;
+    int offset;
+
+    mrp_list_hook_t hook;
+} internal_message_t;
+
+
+static void process_queue(mrp_mainloop_t *ml, mrp_deferred_t *d,
+  void *user_data)
+{
+    internal_message_t *msg;
+    internal_t *endpoint;
+    mrp_list_hook_t *p, *n;
+
+    MRP_UNUSED(ml);
+    MRP_UNUSED(user_data);
+
+    mrp_log_info("process_queue");
+
+    mrp_disable_deferred(d);
+
+    mrp_list_foreach(&msg_queue, p, n) {
+
+        msg = mrp_list_entry(p, typeof(*msg), hook);
+
+        mrp_log_info("processing msg %p", msg);
+
+        if (!msg) {
+            mrp_log_error("no message!");
+            goto end;
+        }
+
+        if (!msg->u->connected) {
+            if (!msg->addr) {
+                mrp_log_error("connected transport without address!");
+                goto end;
+            }
+
+            /* Find the recipient. Look first from the server table.*/
+            endpoint = mrp_htbl_lookup(servers, msg->addr->data);
+
+            if (!endpoint) {
+
+                /* Look next from the general connections table. */
+                endpoint = mrp_htbl_lookup(connections, msg->addr->data);
+                mrp_log_info("sending message to connection '%s'",
+                    msg->addr->data);
+            }
+            else {
+                mrp_log_info("sending message to server '%s'",
+                    msg->addr->data);
+            }
+        }
+        else {
+            endpoint = msg->u->endpoint;
+        }
+
+        if (!endpoint || !endpoint->recv_data) {
+            mrp_log_error("no endpoint matching the address");
+            goto end;
+        }
+
+        /* skip the length word when sending */
+        endpoint->recv_data(
+                (mrp_transport_t *) endpoint, msg->data + msg->offset,
+                msg->size, &msg->u->address, MRP_SOCKADDR_SIZE);
+
+end:
+        if (msg) {
+
+            if (msg->free_data)
+                mrp_free(msg->data);
+
+            mrp_list_delete(&msg->hook);
+            mrp_free(msg);
+        }
+    }
+}
+
+
+static int internal_initialize_table(internal_t *u)
+{
+    mrp_htbl_config_t servers_conf;
+    mrp_htbl_config_t connections_conf;
+
+    MRP_UNUSED(u);
+
+    if (servers && connections && d)
+        return 0; /* already initialized */
+
+    servers_conf.comp = mrp_string_comp;
+    servers_conf.hash = mrp_string_hash;
+    servers_conf.free = NULL;
+    servers_conf.nbucket = 0;
+    servers_conf.nentry = 10;
+
+    servers = mrp_htbl_create(&servers_conf);
+
+    if (!servers)
+        goto error;
+
+    connections_conf.comp = mrp_string_comp;
+    connections_conf.hash = mrp_string_hash;
+    connections_conf.free = NULL;
+    connections_conf.nbucket = 0;
+    connections_conf.nentry = 10;
+
+    connections = mrp_htbl_create(&connections_conf);
+
+    if (!connections)
+        goto error;
+
+    mrp_list_init(&msg_queue);
+
+    cid = 0;
+
+    d = mrp_add_deferred(u->ml, process_queue, NULL);
+
+    if (!d)
+        goto error;
+
+    mrp_disable_deferred(d);
+
+    return 0;
+
+error:
+
+    if (servers)
+        mrp_htbl_destroy(servers, FALSE);
+
+    if (connections)
+        mrp_htbl_destroy(connections, FALSE);
+
+    return -1;
+}
+
+
+static socklen_t internal_resolve(const char *str, mrp_sockaddr_t *addr,
+                              socklen_t size, const char **typep)
+{
+    int len;
+
+    MRP_UNUSED(size);
+
+    mrp_log_info("> internal_resolve");
+
+    if (!str)
+        return 0;
+
+    len = strlen(str);
+
+    if (len <= 9 || len >= MRP_SOCKADDR_SIZE)
+        return 0;
+
+    if (strncmp("internal:", str, 9))
+        return 0;
+
+    if (typep)
+        *typep = INTERNAL;
+
+    memcpy(addr->data, str+9, len-9+1);
+
+    mrp_log_info("resolved %s from %s", addr->data, str);
+
+    return len-9;
+}
+
+
+static int internal_open(mrp_transport_t *mu)
+{
+    internal_t *u = (internal_t *)mu;
+
+    mrp_log_info("> internal_open");
+
+    if (internal_initialize_table(u) < 0)
+        return FALSE;
+
+    memset(u->name, 0, MRP_SOCKADDR_SIZE);
+    memset(u->address.data, 0, MRP_SOCKADDR_SIZE);
+
+    u->active = FALSE;
+
+    snprintf(u->address.data, MRP_SOCKADDR_SIZE, INTERNAL"_%d", cid++);
+
+    mrp_htbl_insert(connections, u->address.data, mu);
+
+    return TRUE;
+}
+
+
+static int internal_bind(mrp_transport_t *mu, mrp_sockaddr_t *addr,
+                     socklen_t addrlen)
+{
+    internal_t *u = (internal_t *)mu;
+
+    mrp_log_info("> internal_bind");
+
+    if (internal_initialize_table(u) < 0)
+        return FALSE;
+
+    memcpy(u->name, addr->data, addrlen+1);
+
+    mrp_htbl_insert(servers, u->name, u);
+
+    u->active = TRUE;
+    u->bound = TRUE;
+
+    return TRUE;
+}
+
+
+static int internal_listen(mrp_transport_t *mu, int backlog)
+{
+    internal_t *u = (internal_t *)mu;
+
+    MRP_UNUSED(backlog);
+
+    if (!u->bound)
+        return FALSE;
+
+    mrp_log_info("> internal_listen");
+
+    u->listening = TRUE;
+
+    return TRUE;
+}
+
+
+static int internal_accept(mrp_transport_t *mt, mrp_transport_t *mlt)
+{
+    internal_t *t = (internal_t *) mt;
+    internal_t *lt = (internal_t *) mlt;
+    internal_t *client = lt->endpoint;
+
+    mrp_log_info("> internal_accept");
+
+    t->endpoint = client;
+    client->endpoint = t;
+
+    lt->endpoint = NULL; /* connection process is now over */
+
+    return TRUE;
+}
+
+
+static void remove_messages(internal_t *u)
+{
+    internal_message_t *msg;
+    mrp_list_hook_t *p, *n;
+
+    mrp_list_foreach(&msg_queue, p, n) {
+        msg = mrp_list_entry(p, typeof(*msg), hook);
+
+        if (strcmp(msg->addr->data, u->name) == 0
+            || strcmp(msg->addr->data, u->address.data) == 0) {
+
+            if (msg->free_data)
+                mrp_free(msg->data);
+
+            mrp_list_delete(&msg->hook);
+            mrp_free(msg);
+        }
+    }
+}
+
+
+static void internal_close(mrp_transport_t *mu)
+{
+    internal_t *u = (internal_t *)mu;
+
+    mrp_log_info("> internal_close");
+
+    /* Is this client or server? If server, go remove the connection from
+     * servers table. */
+
+    if (u->bound) {
+        /* server listening socket */
+        mrp_htbl_remove(servers, u->name, FALSE);
+        u->bound = FALSE;
+    }
+
+    mrp_htbl_remove(connections, u->address.data, FALSE);
+
+    u->active = FALSE;
+
+    remove_messages(u);
+}
+
+
+static int internal_connect(mrp_transport_t *mu, mrp_sockaddr_t *addr,
+                        socklen_t addrlen)
+{
+    internal_t *u = (internal_t *)mu;
+    internal_t *host;
+    mrp_transport_t *mt;
+
+    MRP_UNUSED(addrlen);
+
+    mrp_log_info("> internal_connect");
+
+    /* client connecting */
+
+    if (!servers) {
+        mrp_log_error("no servers available for connecting");
+        return FALSE;
+    }
+
+    host = mrp_htbl_lookup(servers, addr->data);
+
+    if (!host) {
+        mrp_log_error("server '%s' wasn't found", addr->data);
+        return FALSE;
+    }
+
+    mt = (mrp_transport_t *) host;
+
+    host->endpoint = u; /* temporary connection data */
+
+    host->evt.connection(mt, mt->user_data);
+
+    return TRUE;
+}
+
+
+static int internal_disconnect(mrp_transport_t *mu)
+{
+    internal_t *u = (internal_t *)mu;
+
+    mrp_log_info("> internal_disconnect");
+
+    if (u->connected) {
+        internal_t *endpoint = u->endpoint;
+
+        if (endpoint) {
+            endpoint->endpoint = NULL;
+            mrp_transport_disconnect((mrp_transport_t *) endpoint);
+        }
+        u->endpoint = NULL;
+    }
+
+    return TRUE;
+}
+
+
+static int internal_sendto(mrp_transport_t *mu, mrp_msg_t *data,
+                       mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+    internal_t *u = (internal_t *)mu;
+    void *buf;
+    size_t size;
+    internal_message_t *msg;
+
+
+    mrp_log_info("> internal_sendto");
+
+    size = mrp_msg_default_encode(data, &buf);
+
+    if (size == 0 || buf == NULL) {
+        return FALSE;
+    }
+
+    msg = mrp_allocz(sizeof(internal_message_t));
+
+    msg->addr = addr;
+    msg->addrlen = addrlen;
+    msg->data = buf;
+    msg->free_data = FALSE;
+    msg->offset = 0;
+    msg->size = size;
+    msg->u = u;
+
+    mrp_list_init(&msg->hook);
+    mrp_list_append(&msg_queue, &msg->hook);
+
+    mrp_enable_deferred(d);
+
+    return TRUE;
+}
+
+
+static int internal_send(mrp_transport_t *mu, mrp_msg_t *msg)
+{
+    mrp_log_info("> internal_send");
+
+    if (!mu->connected) {
+        return FALSE;
+    }
+
+    return internal_sendto(mu, msg, NULL, 0);
+}
+
+
+static int internal_sendrawto(mrp_transport_t *mu, void *data, size_t size,
+                          mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+    internal_t *u = (internal_t *)mu;
+    internal_message_t *msg;
+
+    mrp_log_info("> internal_sendrawto");
+
+    msg = mrp_allocz(sizeof(internal_message_t));
+
+    msg->addr = addr;
+    msg->addrlen = addrlen;
+    msg->data = data;
+    msg->free_data = FALSE;
+    msg->offset = 0;
+    msg->size = size;
+    msg->u = u;
+
+    mrp_list_init(&msg->hook);
+    mrp_list_append(&msg_queue, &msg->hook);
+
+    mrp_enable_deferred(d);
+
+    return TRUE;
+}
+
+
+static int internal_sendraw(mrp_transport_t *mu, void *data, size_t size)
+{
+    mrp_log_info("> internal_sendraw");
+
+    if (!mu->connected) {
+        return FALSE;
+    }
+
+    return internal_sendrawto(mu, data, size, NULL, 0);
+}
+
+
+static size_t encode_custom_data(void *data, void **newdata, uint16_t tag)
+{
+    mrp_data_descr_t *type = mrp_msg_find_type(tag);
+    uint32_t *lenp;
+    uint16_t *tagp;
+    size_t reserve, size;
+    int len;
+    void *buf;
+
+    if (type == NULL) {
+        mrp_log_error("type not found!");
+        return 0;
+    }
+
+    reserve = sizeof(*lenp) + sizeof(*tagp);
+    size = mrp_data_encode(&buf, data, type, reserve);
+
+    if (size == 0) {
+        mrp_log_error("data encoding failed");
+        return 0;
+    }
+
+    /* some format conversion */
+
+    lenp = buf;
+    len = size - sizeof(*lenp);
+    tagp = buf + sizeof(*lenp);
+
+    /* *lenp = htobe32(len); */ /* Not used */
+    *tagp = htobe16(tag);
+
+    *newdata = buf;
+
+    return len;
+}
+
+
+static int internal_senddatato(mrp_transport_t *mu, void *data, uint16_t tag,
+                           mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+    internal_t *u = (internal_t *)mu;
+    mrp_data_descr_t *type = mrp_msg_find_type(tag);
+    void *newdata = NULL;
+    size_t size;
+    internal_message_t *msg;
+
+    mrp_log_info("> internal_senddatato");
+
+    if (type == NULL)
+        return FALSE;
+
+    size = encode_custom_data(data, &newdata, tag);
+
+    if (!newdata) {
+        mrp_log_error("custom data encoding failed");
+        return FALSE;
+    }
+
+    msg = mrp_allocz(sizeof(internal_message_t));
+
+    msg->addr = addr;
+    msg->addrlen = addrlen;
+    msg->data = newdata;
+    msg->free_data = TRUE;
+    msg->offset = 4;
+    msg->size = size;
+    msg->u = u;
+
+    mrp_list_init(&msg->hook);
+    mrp_list_append(&msg_queue, &msg->hook);
+
+    mrp_enable_deferred(d);
+
+    return TRUE;
+}
+
+
+static int internal_senddata(mrp_transport_t *mu, void *data, uint16_t tag)
+{
+    mrp_log_info("> internal_senddata");
+
+    if (!mu->connected) {
+        return FALSE;
+    }
+
+    return internal_senddatato(mu, data, tag, NULL, 0);
+}
+
+
+
+
+MRP_REGISTER_TRANSPORT(internal, INTERNAL, internal_t, internal_resolve,
+                       internal_open, NULL, internal_close,
+                       internal_bind, internal_listen, internal_accept,
+                       internal_connect, internal_disconnect,
+                       internal_send, internal_sendto,
+                       internal_sendraw, internal_sendrawto,
+                       internal_senddata, internal_senddatato);
index 6c44010..1d62719 100644 (file)
@@ -1,6 +1,6 @@
 AM_CFLAGS = $(WARNING_CFLAGS) -I$(top_builddir)
 
-noinst_PROGRAMS  = mm-test hash-test msg-test transport-test
+noinst_PROGRAMS  = mm-test hash-test msg-test transport-test internal-transport-test
 if DBUS_ENABLED
 noinst_PROGRAMS += mainloop-test dbus-test
 endif
@@ -34,6 +34,11 @@ transport_test_SOURCES = transport-test.c
 transport_test_CFLAGS  = $(AM_CFLAGS)
 transport_test_LDADD   = ../../libmurphy-common.la
 
+# internal transport test
+internal_transport_test_SOURCES = internal-transport-test.c
+internal_transport_test_CFLAGS  = $(AM_CFLAGS)
+internal_transport_test_LDADD   = ../../libmurphy-common.la
+
 if DBUS_ENABLED
 transport_test_LDADD  += ../../libmurphy-dbus.la
 
diff --git a/src/common/tests/internal-transport-test.c b/src/common/tests/internal-transport-test.c
new file mode 100644 (file)
index 0000000..debb56d
--- /dev/null
@@ -0,0 +1,785 @@
+/*
+ * Copyright (c) 2012, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *  * Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ *  * Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *  * Neither the name of Intel Corporation nor the names of its contributors
+ *    may be used to endorse or promote products derived from this software
+ *    without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#define _GNU_SOURCE
+#include <getopt.h>
+
+#include <murphy/common.h>
+
+
+/*
+ * tags for generic message fields
+ */
+
+#define TAG_SEQ   ((uint16_t)0x1)
+#define TAG_MSG   ((uint16_t)0x2)
+#define TAG_U8    ((uint16_t)0x3)
+#define TAG_S8    ((uint16_t)0x4)
+#define TAG_U16   ((uint16_t)0x5)
+#define TAG_S16   ((uint16_t)0x6)
+#define TAG_DBL   ((uint16_t)0x7)
+#define TAG_BLN   ((uint16_t)0x8)
+#define TAG_ASTR  ((uint16_t)0x9)
+#define TAG_AU32  ((uint16_t)0xa)
+#define TAG_RPL   ((uint16_t)0xb)
+#define TAG_END   MRP_MSG_FIELD_END
+
+#define U32_GUARD (uint32_t)-1
+
+/*
+ * our test custom data type
+ */
+
+#define TAG_CUSTOM 0x1
+
+typedef struct {
+    uint32_t   seq;
+    char      *msg;
+    uint8_t     u8;
+    int8_t      s8;
+    uint16_t   u16;
+    int16_t    s16;
+    double     dbl;
+    bool       bln;
+    char     **astr;
+    uint32_t   nstr;
+    uint32_t   fsck;
+    uint32_t  *au32;
+    char      *rpl;
+} custom_t;
+
+
+MRP_DATA_DESCRIPTOR(custom_descr, TAG_CUSTOM, custom_t,
+                    MRP_DATA_MEMBER(custom_t,  seq, MRP_MSG_FIELD_UINT32),
+                    MRP_DATA_MEMBER(custom_t,  msg, MRP_MSG_FIELD_STRING),
+                    MRP_DATA_MEMBER(custom_t,   u8, MRP_MSG_FIELD_UINT8 ),
+                    MRP_DATA_MEMBER(custom_t,   s8, MRP_MSG_FIELD_SINT8 ),
+                    MRP_DATA_MEMBER(custom_t,  u16, MRP_MSG_FIELD_UINT16),
+                    MRP_DATA_MEMBER(custom_t,  s16, MRP_MSG_FIELD_SINT16),
+                    MRP_DATA_MEMBER(custom_t,  dbl, MRP_MSG_FIELD_DOUBLE),
+                    MRP_DATA_MEMBER(custom_t,  bln, MRP_MSG_FIELD_BOOL  ),
+                    MRP_DATA_MEMBER(custom_t,  rpl, MRP_MSG_FIELD_STRING),
+                    MRP_DATA_MEMBER(custom_t, nstr, MRP_MSG_FIELD_UINT32),
+                    MRP_DATA_MEMBER(custom_t, fsck, MRP_MSG_FIELD_UINT32),
+                    MRP_DATA_ARRAY_COUNT(custom_t, astr, nstr,
+                                         MRP_MSG_FIELD_STRING),
+                    MRP_DATA_ARRAY_GUARD(custom_t, au32, u32, U32_GUARD,
+                                         MRP_MSG_FIELD_UINT32));
+
+MRP_DATA_DESCRIPTOR(buggy_descr, TAG_CUSTOM, custom_t,
+                    MRP_DATA_MEMBER(custom_t,  seq, MRP_MSG_FIELD_UINT32),
+                    MRP_DATA_MEMBER(custom_t,  msg, MRP_MSG_FIELD_STRING),
+                    MRP_DATA_MEMBER(custom_t,   u8, MRP_MSG_FIELD_UINT8 ),
+                    MRP_DATA_MEMBER(custom_t,   s8, MRP_MSG_FIELD_SINT8 ),
+                    MRP_DATA_MEMBER(custom_t,  u16, MRP_MSG_FIELD_UINT16),
+                    MRP_DATA_MEMBER(custom_t,  s16, MRP_MSG_FIELD_SINT16),
+                    MRP_DATA_MEMBER(custom_t,  dbl, MRP_MSG_FIELD_DOUBLE),
+                    MRP_DATA_MEMBER(custom_t,  bln, MRP_MSG_FIELD_BOOL  ),
+                    MRP_DATA_MEMBER(custom_t,  rpl, MRP_MSG_FIELD_STRING),
+                    MRP_DATA_MEMBER(custom_t, nstr, MRP_MSG_FIELD_UINT32),
+                    MRP_DATA_MEMBER(custom_t, fsck, MRP_MSG_FIELD_UINT32),
+                    MRP_DATA_ARRAY_COUNT(custom_t, astr, fsck,
+                                         MRP_MSG_FIELD_STRING),
+                    MRP_DATA_ARRAY_GUARD(custom_t, au32, u32, U32_GUARD,
+                                         MRP_MSG_FIELD_UINT32));
+
+mrp_data_descr_t *data_descr;
+
+typedef struct {
+    mrp_mainloop_t  *ml;
+    mrp_transport_t *lt, *st;
+    char            *addrstr;
+    mrp_sockaddr_t   addr;
+    socklen_t        alen;
+    const char      *atype;
+    int              server;
+    int              sock;
+    mrp_io_watch_t  *iow;
+    mrp_timer_t     *timer;
+    int              custom;
+    int              buggy;
+    int              connect;
+    int              stream;
+    int              log_mask;
+    const char      *log_target;
+    uint32_t         seqno;
+    mrp_list_hook_t  clients;
+} context_t;
+
+typedef struct {
+    int              id;
+    mrp_transport_t *t;
+    context_t       *c;
+    mrp_list_hook_t  hook;
+} client_t;
+
+
+void recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
+void recvfrom_msg(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
+                  socklen_t addrlen, void *user_data);
+
+void recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data);
+void recvfrom_custom(mrp_transport_t *t, void *data, uint16_t tag,
+                     mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data);
+
+
+
+void dump_msg(mrp_msg_t *msg, FILE *fp)
+{
+    mrp_msg_dump(msg, fp);
+}
+
+
+void srv_recvfrom_msg(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
+                  socklen_t addrlen, void *user_data)
+{
+    context_t       *c = (context_t *)user_data;
+    mrp_msg_field_t *f;
+    uint32_t         seq;
+    char             buf[256];
+    int              status;
+
+    mrp_log_info("received a message");
+    dump_msg(msg, stdout);
+
+    seq = 0;
+    if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+        if (f->type == MRP_MSG_FIELD_UINT32)
+            seq = f->u32;
+    }
+
+    snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+
+    if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+                        TAG_END)) {
+        mrp_log_info("failed to append to received message");
+        exit(1);
+    }
+
+    if (c->connect)
+        status = mrp_transport_send(t, msg);
+    else
+        status = mrp_transport_sendto(t, msg, addr, addrlen);
+
+    if (status)
+        mrp_log_info("reply successfully sent");
+    else
+        mrp_log_error("failed to send reply");
+
+    /* message unreffed by transport layer */
+}
+
+
+void recvfrom_msg(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
+                  socklen_t addrlen, void *user_data)
+{
+    MRP_UNUSED(t);
+    MRP_UNUSED(addr);
+    MRP_UNUSED(addrlen);
+    MRP_UNUSED(user_data);
+
+    mrp_log_info("client received a message");
+    dump_msg(msg, stdout);
+}
+
+
+void srv_recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+    return srv_recvfrom_msg(t, msg, NULL, 0, user_data);
+}
+
+
+void recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+    return recvfrom_msg(t, msg, NULL, 0, user_data);
+}
+
+
+void dump_custom(custom_t *msg, FILE *fp)
+{
+    uint32_t i;
+
+    mrp_data_dump(msg, data_descr, fp);
+    fprintf(fp, "{\n");
+    fprintf(fp, "    seq = %u\n"  , msg->seq);
+    fprintf(fp, "    msg = '%s'\n", msg->msg);
+    fprintf(fp, "     u8 = %u\n"  , msg->u8);
+    fprintf(fp, "     s8 = %d\n"  , msg->s8);
+    fprintf(fp, "    u16 = %u\n"  , msg->u16);
+    fprintf(fp, "    s16 = %d\n"  , msg->s16);
+    fprintf(fp, "    dbl = %f\n"  , msg->dbl);
+    fprintf(fp, "    bln = %s\n"  , msg->bln ? "true" : "false");
+    fprintf(fp, "   astr = (%u)\n", msg->nstr);
+    for (i = 0; i < msg->nstr; i++)
+        fprintf(fp, "           %s\n", msg->astr[i]);
+    fprintf(fp, "   au32 =\n");
+    for (i = 0; msg->au32[i] != U32_GUARD; i++)
+        fprintf(fp, "           %u\n", msg->au32[i]);
+    fprintf(fp, "    rpl = '%s'\n", msg->rpl);
+    fprintf(fp, "}\n");
+}
+
+
+void free_custom(custom_t *msg)
+{
+    mrp_data_free(msg, data_descr->tag);
+}
+
+
+
+void srv_recvfrom_custom(mrp_transport_t *t, void *data, uint16_t tag,
+                     mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data)
+{
+    context_t *c   = (context_t *)user_data;
+    custom_t  *msg = (custom_t *)data;
+    custom_t   rpl;
+    char       buf[256];
+    uint32_t   au32[] = { 9, 8, 7, 6, 5, -1 };
+    int        status;
+
+    mrp_log_info("server received custom message of type 0x%x", tag);
+    dump_custom(data, stdout);
+
+    if (tag != data_descr->tag) {
+        mrp_log_error("Tag 0x%x != our custom type (0x%x).",
+                      tag, data_descr->tag);
+        exit(1);
+    }
+
+    rpl = *msg;
+    snprintf(buf, sizeof(buf), "reply to message #%u", msg->seq);
+    rpl.rpl  = buf;
+    rpl.au32 = au32;
+
+    if (c->connect)
+        status = mrp_transport_senddata(t, &rpl, data_descr->tag);
+    else
+        status = mrp_transport_senddatato(t, &rpl, data_descr->tag,
+                                          addr, addrlen);
+    if (status)
+        mrp_log_info("reply successfully sent");
+    else
+        mrp_log_error("failed to send reply");
+
+    free_custom(msg);
+}
+
+void recvfrom_custom(mrp_transport_t *t, void *data, uint16_t tag,
+                     mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data)
+{
+    custom_t  *msg = (custom_t *)data;
+
+    MRP_UNUSED(t);
+    MRP_UNUSED(data);
+    MRP_UNUSED(addr);
+    MRP_UNUSED(addrlen);
+    MRP_UNUSED(user_data);
+
+    mrp_log_info("received custom message of type 0x%x", tag);
+    dump_custom(data, stdout);
+
+    if (tag != data_descr->tag) {
+        mrp_log_error("Tag 0x%x != our custom type (0x%x).",
+                      tag, data_descr->tag);
+        exit(1);
+    }
+
+    free_custom(msg);
+}
+
+
+void recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data)
+{
+    recvfrom_custom(t, data, tag, NULL, 0, user_data);
+}
+
+void srv_recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data)
+{
+    srv_recvfrom_custom(t, data, tag, NULL, 0, user_data);
+}
+
+void closed_evt(mrp_transport_t *t, int error, void *user_data)
+{
+    context_t *c = (context_t *)user_data;
+
+    MRP_UNUSED(t);
+    MRP_UNUSED(c);
+
+    if (error) {
+        mrp_log_error("Connection closed with error %d (%s).", error,
+                      strerror(error));
+        exit(1);
+    }
+    else {
+        mrp_log_info("Peer has closed the connection.");
+        exit(0);
+    }
+}
+
+
+void connection_evt(mrp_transport_t *lt, void *user_data)
+{
+    context_t *c = (context_t *)user_data;
+    int        flags;
+
+    mrp_log_info("connection event!");
+
+    flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
+    c->st = mrp_transport_accept(lt, c, flags);
+
+    if (c->st == NULL) {
+        mrp_log_error("Failed to accept new connection.");
+        exit(1);
+    }
+}
+
+
+void type_init(context_t *c)
+{
+    if (c->buggy && c->server) {
+        data_descr = &buggy_descr;
+        mrp_log_info("Deliberately using buggy data descriptor...");
+    }
+    else
+        data_descr = &custom_descr;
+
+    if (!mrp_msg_register_type(data_descr)) {
+        mrp_log_error("Failed to register custom data type.");
+        exit(1);
+    }
+}
+
+
+void server_init(context_t *c)
+{
+    static mrp_transport_evt_t evt = {
+        { .recvmsg     = NULL },
+        { .recvmsgfrom = NULL },
+        .closed        = closed_evt,
+        .connection    = connection_evt
+    };
+
+    int flags;
+
+    if (c->custom) {
+        evt.recvdata     = srv_recv_custom;
+        evt.recvdatafrom = srv_recvfrom_custom;
+    }
+    else {
+        evt.recvmsg     = srv_recv_msg;
+        evt.recvmsgfrom = srv_recvfrom_msg;
+    }
+
+
+    flags = MRP_TRANSPORT_REUSEADDR |
+        (c->custom ? MRP_TRANSPORT_MODE_CUSTOM : 0);
+    c->lt = mrp_transport_create(c->ml, c->atype, &evt, c, flags);
+
+    if (c->lt == NULL) {
+        mrp_log_error("Failed to create listening server transport.");
+        exit(1);
+    }
+
+    if (!mrp_transport_bind(c->lt, &c->addr, c->alen)) {
+        mrp_log_error("Failed to bind transport to address %s.", c->addrstr);
+        exit(1);
+    }
+
+    if (c->stream) {
+        if (!mrp_transport_listen(c->lt, 0)) {
+            mrp_log_error("Failed to listen on server transport.");
+            exit(1);
+        }
+    }
+}
+
+
+void send_msg(client_t *client)
+{
+    mrp_msg_t *msg;
+    uint32_t   seq;
+    char       buf[256];
+    char      *astr[] = { "this", "is", "an", "array", "of", "strings" };
+    uint32_t   au32[] = { 1, 2, 3,
+                          1 << 16, 2 << 16, 3 << 16,
+                          1 << 24, 2 << 24, 3 << 24 };
+    uint32_t   nstr = MRP_ARRAY_SIZE(astr);
+    uint32_t   nu32 = MRP_ARRAY_SIZE(au32);
+    int        status;
+    context_t *c = client->c;
+
+    seq = c->seqno++;
+    snprintf(buf, sizeof(buf), "this is message #%u", (unsigned int)seq);
+
+    msg = mrp_msg_create(TAG_SEQ , MRP_MSG_FIELD_UINT32, seq,
+                         TAG_MSG , MRP_MSG_FIELD_STRING, buf,
+                         TAG_U8  , MRP_MSG_FIELD_UINT8 ,   seq & 0xf,
+                         TAG_S8  , MRP_MSG_FIELD_SINT8 , -(seq & 0xf),
+                         TAG_U16 , MRP_MSG_FIELD_UINT16,   seq,
+                         TAG_S16 , MRP_MSG_FIELD_SINT16, - seq,
+                         TAG_DBL , MRP_MSG_FIELD_DOUBLE, seq / 3.0,
+                         TAG_BLN , MRP_MSG_FIELD_BOOL  , seq & 0x1,
+                         TAG_ASTR, MRP_MSG_FIELD_ARRAY_OF(STRING), nstr, astr,
+                         TAG_AU32, MRP_MSG_FIELD_ARRAY_OF(UINT32), nu32, au32,
+                         TAG_END);
+
+    if (msg == NULL) {
+        mrp_log_error("Failed to create new message.");
+        exit(1);
+    }
+
+    if (c->connect)
+        status = mrp_transport_send(client->t, msg);
+    else
+        status = mrp_transport_sendto(client->t, msg, &c->addr, c->alen);
+
+    if (!status) {
+        mrp_log_error("Failed to send message #%d.", seq);
+        exit(1);
+    }
+    else
+        mrp_log_info("Message #%d succesfully sent.", seq);
+
+    mrp_msg_unref(msg);
+}
+
+
+void send_custom(client_t *client)
+{
+    custom_t   msg;
+    char       buf[256];
+    char      *astr[] = { "this", "is", "a", "test", "string", "array" };
+    uint32_t   au32[] = { 1, 2, 3, 4, 5, 6, 7, -1 };
+    int        status;
+    context_t *c = client->c;
+    uint32_t   seq = c->seqno++;
+
+    msg.seq = seq;
+    snprintf(buf, sizeof(buf), "this is message #%u", (unsigned int)seq);
+    msg.msg  = buf;
+    msg.u8   =   seq & 0xf;
+    msg.s8   = -(seq & 0xf);
+    msg.u16  =   seq;
+    msg.s16  = - seq;
+    msg.dbl  =   seq / 3.0;
+    msg.bln  =   seq & 0x1;
+    msg.astr = astr;
+    msg.nstr = MRP_ARRAY_SIZE(astr);
+    msg.fsck = 1000;
+    msg.au32 = au32;
+    msg.rpl  = "";
+
+    if (c->connect)
+        status = mrp_transport_senddata(client->t, &msg, data_descr->tag);
+    else
+        status = mrp_transport_senddatato(client->t, &msg, data_descr->tag,
+                                          &c->addr, c->alen);
+
+    if (!status) {
+        mrp_log_error("Failed to send message #%d.", msg.seq);
+        exit(1);
+    }
+    else
+        mrp_log_info("Message #%d succesfully sent.", msg.seq);
+}
+
+
+
+void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
+{
+    client_t *client = (client_t *)user_data;
+    context_t *c = client->c;
+
+    MRP_UNUSED(ml);
+    MRP_UNUSED(t);
+
+    if (c->custom)
+        send_custom(client);
+    else
+        send_msg(client);
+}
+
+
+void client_init(context_t *c)
+{
+    static mrp_transport_evt_t evt = {
+        { .recvmsg     = NULL },
+        { .recvmsgfrom = NULL },
+        .closed        = closed_evt,
+        .connection    = NULL
+    };
+
+    int flags;
+    client_t *client;
+
+    if (c->custom) {
+        evt.recvdata     = recv_custom;
+        evt.recvdatafrom = recvfrom_custom;
+    }
+    else {
+        evt.recvmsg     = recv_msg;
+        evt.recvmsgfrom = recvfrom_msg;
+    }
+
+    client = mrp_allocz(sizeof(client_t));
+    mrp_list_init(&client->hook);
+    client->c = c;
+
+    mrp_list_append(&c->clients, &client->hook);
+
+    flags = c->custom ? MRP_TRANSPORT_MODE_CUSTOM : 0;
+    client->t  = mrp_transport_create(c->ml, c->atype, &evt, client, flags);
+
+    if (client->t == NULL) {
+        mrp_log_error("Failed to create new transport.");
+        exit(1);
+    }
+
+    if (!strcmp(c->atype, "unxd")) {
+        char           addrstr[] = "unxd:@stream-test-client";
+        mrp_sockaddr_t addr;
+        socklen_t      alen;
+
+        alen = mrp_transport_resolve(NULL, addrstr, &addr, sizeof(addr), NULL);
+        if (alen <= 0) {
+            mrp_log_error("Failed to resolve transport address '%s'.", addrstr);
+            exit(1);
+        }
+
+        if (!mrp_transport_bind(client->t, &addr, alen)) {
+            mrp_log_error("Failed to bind to transport address '%s'.", addrstr);
+            exit(1);
+        }
+    }
+
+    if (c->connect) {
+        if (!mrp_transport_connect(client->t, &c->addr, c->alen)) {
+            mrp_log_error("Failed to connect to %s.", c->addrstr);
+            exit(1);
+        }
+    }
+
+
+    c->timer = mrp_add_timer(c->ml, 1000, send_cb, client);
+
+    if (c->timer == NULL) {
+        mrp_log_error("Failed to create send timer.");
+        exit(1);
+    }
+}
+
+
+static void print_usage(const char *argv0, int exit_code, const char *fmt, ...)
+{
+    va_list ap;
+
+    if (fmt && *fmt) {
+        va_start(ap, fmt);
+        vprintf(fmt, ap);
+        va_end(ap);
+    }
+
+    printf("usage: %s [options] [transport-address]\n\n"
+           "The possible options are:\n"
+           "  -s, --server                   run as test server (default)\n"
+           "  -C, --connect                  connect transport\n"
+           "      For connection-oriented transports, this is automatic.\n"
+           "  -a, --address                  address to use\n"
+           "  -c, --custom                   use custom messages\n"
+           "  -m, --message                  use generic messages (default)\n"
+           "  -b, --buggy                    use buggy data descriptors\n"
+           "  -t, --log-target=TARGET        log target to use\n"
+           "      TARGET is one of stderr,stdout,syslog, or a logfile path\n"
+           "  -l, --log-level=LEVELS         logging level to use\n"
+           "      LEVELS is a comma separated list of info, error and warning\n"
+           "  -v, --verbose                  increase logging verbosity\n"
+           "  -d, --debug                    enable debug messages\n"
+           "  -h, --help                     show help on usage\n",
+           argv0);
+
+    if (exit_code < 0)
+        return;
+    else
+        exit(exit_code);
+}
+
+
+static void config_set_defaults(context_t *ctx)
+{
+    mrp_clear(ctx);
+    ctx->addrstr    = "tcp4:127.0.0.1:3000";
+    ctx->server     = FALSE;
+    ctx->custom     = FALSE;
+    ctx->log_mask   = MRP_LOG_UPTO(MRP_LOG_DEBUG);
+    ctx->log_target = MRP_LOG_TO_STDERR;
+}
+
+
+int parse_cmdline(context_t *ctx, int argc, char **argv)
+{
+#   define OPTIONS "scmbCa:l:t:vdh"
+    struct option options[] = {
+        { "server"    , no_argument      , NULL, 's' },
+        { "address"   , required_argument, NULL, 'a' },
+        { "custom"    , no_argument      , NULL, 'c' },
+        { "connect"   , no_argument      , NULL, 'C' },
+        { "message"   , no_argument      , NULL, 'm' },
+        { "buggy"     , no_argument      , NULL, 'b' },
+        { "log-level" , required_argument, NULL, 'l' },
+        { "log-target", required_argument, NULL, 't' },
+        { "verbose"   , optional_argument, NULL, 'v' },
+        { "debug"     , no_argument      , NULL, 'd' },
+        { "help"      , no_argument      , NULL, 'h' },
+        { NULL, 0, NULL, 0 }
+    };
+
+    int  opt, debug;
+
+    debug = FALSE;
+    config_set_defaults(ctx);
+
+    while ((opt = getopt_long(argc, argv, OPTIONS, options, NULL)) != -1) {
+        switch (opt) {
+        case 's':
+            ctx->server = TRUE;
+            break;
+
+        case 'c':
+            ctx->custom = TRUE;
+            break;
+
+        case 'm':
+            ctx->custom = FALSE;
+            break;
+
+        case 'b':
+            ctx->buggy = TRUE;
+            break;
+
+        case 'C':
+            ctx->connect = TRUE;
+            break;
+
+        case 'a':
+            ctx->addrstr = optarg;
+            break;
+
+        case 'v':
+            ctx->log_mask <<= 1;
+            ctx->log_mask  |= 1;
+            break;
+
+        case 'l':
+            ctx->log_mask = mrp_log_parse_levels(optarg);
+            if (ctx->log_mask < 0)
+                print_usage(argv[0], EINVAL, "invalid log level '%s'", optarg);
+            break;
+
+        case 't':
+            ctx->log_target = mrp_log_parse_target(optarg);
+            if (!ctx->log_target)
+                print_usage(argv[0], EINVAL, "invalid log target '%s'", optarg);
+            break;
+
+        case 'd':
+            debug = TRUE;
+            break;
+
+        case 'h':
+            print_usage(argv[0], -1, "");
+            exit(0);
+            break;
+
+        default:
+            print_usage(argv[0], EINVAL, "invalid option '%c'", opt);
+        }
+    }
+
+    if (debug)
+        ctx->log_mask |= MRP_LOG_MASK_DEBUG;
+
+    return TRUE;
+}
+
+
+int main(int argc, char *argv[])
+{
+    context_t c;
+
+    mrp_clear(&c);
+
+    if (!parse_cmdline(&c, argc, argv))
+        exit(1);
+
+    mrp_log_set_mask(c.log_mask);
+    mrp_log_set_target(c.log_target);
+
+    mrp_log_info("Using address '%s'...", c.addrstr);
+
+    mrp_list_init(&c.clients);
+
+    if (c.custom)
+        mrp_log_info("Using custom messages...");
+    else
+        mrp_log_info("Using generic messages...");
+
+    if (!strncmp(c.addrstr, "tcp", 3) ||
+        !strncmp(c.addrstr, "unxs", 4)) {
+        c.stream  = TRUE;
+        c.connect = TRUE;
+    }
+
+    c.alen = mrp_transport_resolve(NULL, c.addrstr,
+                                   &c.addr, sizeof(c.addr), &c.atype);
+    if (c.alen <= 0) {
+        mrp_log_error("Failed to resolve transport address '%s'.", c.addrstr);
+        exit(1);
+    }
+
+    c.ml = mrp_mainloop_create();
+
+    type_init(&c);
+
+    server_init(&c);
+
+    client_init(&c);
+    client_init(&c);
+
+    mrp_mainloop_run(c.ml);
+
+    return 0;
+}