--- /dev/null
+/*
+ * Copyright (c) 2012, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Intel Corporation nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+
+#include <murphy/common.h>
+
+#define INTERNAL "internal"
+
+/* storage for the global data, TODO: refactor away */
+static mrp_htbl_t *servers = NULL;
+static mrp_htbl_t *connections = NULL;
+static mrp_list_hook_t msg_queue;
+static mrp_deferred_t *d;
+static uint32_t cid;
+
+typedef struct internal_s internal_t;
+
+struct internal_s {
+ MRP_TRANSPORT_PUBLIC_FIELDS; /* common transport fields */
+ char name[MRP_SOCKADDR_SIZE]; /* bound connection name */
+ mrp_sockaddr_t address; /* internal connection name*/
+ bool active;
+ bool bound;
+ bool listening;
+
+ internal_t *endpoint; /* that we are connected to */
+};
+
+typedef struct {
+ void *data;
+ size_t size;
+ internal_t *u;
+ mrp_sockaddr_t *addr;
+ socklen_t addrlen;
+ bool free_data;
+ int offset;
+
+ mrp_list_hook_t hook;
+} internal_message_t;
+
+
+static void process_queue(mrp_mainloop_t *ml, mrp_deferred_t *d,
+ void *user_data)
+{
+ internal_message_t *msg;
+ internal_t *endpoint;
+ mrp_list_hook_t *p, *n;
+
+ MRP_UNUSED(ml);
+ MRP_UNUSED(user_data);
+
+ mrp_log_info("process_queue");
+
+ mrp_disable_deferred(d);
+
+ mrp_list_foreach(&msg_queue, p, n) {
+
+ msg = mrp_list_entry(p, typeof(*msg), hook);
+
+ mrp_log_info("processing msg %p", msg);
+
+ if (!msg) {
+ mrp_log_error("no message!");
+ goto end;
+ }
+
+ if (!msg->u->connected) {
+ if (!msg->addr) {
+ mrp_log_error("connected transport without address!");
+ goto end;
+ }
+
+ /* Find the recipient. Look first from the server table.*/
+ endpoint = mrp_htbl_lookup(servers, msg->addr->data);
+
+ if (!endpoint) {
+
+ /* Look next from the general connections table. */
+ endpoint = mrp_htbl_lookup(connections, msg->addr->data);
+ mrp_log_info("sending message to connection '%s'",
+ msg->addr->data);
+ }
+ else {
+ mrp_log_info("sending message to server '%s'",
+ msg->addr->data);
+ }
+ }
+ else {
+ endpoint = msg->u->endpoint;
+ }
+
+ if (!endpoint || !endpoint->recv_data) {
+ mrp_log_error("no endpoint matching the address");
+ goto end;
+ }
+
+ /* skip the length word when sending */
+ endpoint->recv_data(
+ (mrp_transport_t *) endpoint, msg->data + msg->offset,
+ msg->size, &msg->u->address, MRP_SOCKADDR_SIZE);
+
+end:
+ if (msg) {
+
+ if (msg->free_data)
+ mrp_free(msg->data);
+
+ mrp_list_delete(&msg->hook);
+ mrp_free(msg);
+ }
+ }
+}
+
+
+static int internal_initialize_table(internal_t *u)
+{
+ mrp_htbl_config_t servers_conf;
+ mrp_htbl_config_t connections_conf;
+
+ MRP_UNUSED(u);
+
+ if (servers && connections && d)
+ return 0; /* already initialized */
+
+ servers_conf.comp = mrp_string_comp;
+ servers_conf.hash = mrp_string_hash;
+ servers_conf.free = NULL;
+ servers_conf.nbucket = 0;
+ servers_conf.nentry = 10;
+
+ servers = mrp_htbl_create(&servers_conf);
+
+ if (!servers)
+ goto error;
+
+ connections_conf.comp = mrp_string_comp;
+ connections_conf.hash = mrp_string_hash;
+ connections_conf.free = NULL;
+ connections_conf.nbucket = 0;
+ connections_conf.nentry = 10;
+
+ connections = mrp_htbl_create(&connections_conf);
+
+ if (!connections)
+ goto error;
+
+ mrp_list_init(&msg_queue);
+
+ cid = 0;
+
+ d = mrp_add_deferred(u->ml, process_queue, NULL);
+
+ if (!d)
+ goto error;
+
+ mrp_disable_deferred(d);
+
+ return 0;
+
+error:
+
+ if (servers)
+ mrp_htbl_destroy(servers, FALSE);
+
+ if (connections)
+ mrp_htbl_destroy(connections, FALSE);
+
+ return -1;
+}
+
+
+static socklen_t internal_resolve(const char *str, mrp_sockaddr_t *addr,
+ socklen_t size, const char **typep)
+{
+ int len;
+
+ MRP_UNUSED(size);
+
+ mrp_log_info("> internal_resolve");
+
+ if (!str)
+ return 0;
+
+ len = strlen(str);
+
+ if (len <= 9 || len >= MRP_SOCKADDR_SIZE)
+ return 0;
+
+ if (strncmp("internal:", str, 9))
+ return 0;
+
+ if (typep)
+ *typep = INTERNAL;
+
+ memcpy(addr->data, str+9, len-9+1);
+
+ mrp_log_info("resolved %s from %s", addr->data, str);
+
+ return len-9;
+}
+
+
+static int internal_open(mrp_transport_t *mu)
+{
+ internal_t *u = (internal_t *)mu;
+
+ mrp_log_info("> internal_open");
+
+ if (internal_initialize_table(u) < 0)
+ return FALSE;
+
+ memset(u->name, 0, MRP_SOCKADDR_SIZE);
+ memset(u->address.data, 0, MRP_SOCKADDR_SIZE);
+
+ u->active = FALSE;
+
+ snprintf(u->address.data, MRP_SOCKADDR_SIZE, INTERNAL"_%d", cid++);
+
+ mrp_htbl_insert(connections, u->address.data, mu);
+
+ return TRUE;
+}
+
+
+static int internal_bind(mrp_transport_t *mu, mrp_sockaddr_t *addr,
+ socklen_t addrlen)
+{
+ internal_t *u = (internal_t *)mu;
+
+ mrp_log_info("> internal_bind");
+
+ if (internal_initialize_table(u) < 0)
+ return FALSE;
+
+ memcpy(u->name, addr->data, addrlen+1);
+
+ mrp_htbl_insert(servers, u->name, u);
+
+ u->active = TRUE;
+ u->bound = TRUE;
+
+ return TRUE;
+}
+
+
+static int internal_listen(mrp_transport_t *mu, int backlog)
+{
+ internal_t *u = (internal_t *)mu;
+
+ MRP_UNUSED(backlog);
+
+ if (!u->bound)
+ return FALSE;
+
+ mrp_log_info("> internal_listen");
+
+ u->listening = TRUE;
+
+ return TRUE;
+}
+
+
+static int internal_accept(mrp_transport_t *mt, mrp_transport_t *mlt)
+{
+ internal_t *t = (internal_t *) mt;
+ internal_t *lt = (internal_t *) mlt;
+ internal_t *client = lt->endpoint;
+
+ mrp_log_info("> internal_accept");
+
+ t->endpoint = client;
+ client->endpoint = t;
+
+ lt->endpoint = NULL; /* connection process is now over */
+
+ return TRUE;
+}
+
+
+static void remove_messages(internal_t *u)
+{
+ internal_message_t *msg;
+ mrp_list_hook_t *p, *n;
+
+ mrp_list_foreach(&msg_queue, p, n) {
+ msg = mrp_list_entry(p, typeof(*msg), hook);
+
+ if (strcmp(msg->addr->data, u->name) == 0
+ || strcmp(msg->addr->data, u->address.data) == 0) {
+
+ if (msg->free_data)
+ mrp_free(msg->data);
+
+ mrp_list_delete(&msg->hook);
+ mrp_free(msg);
+ }
+ }
+}
+
+
+static void internal_close(mrp_transport_t *mu)
+{
+ internal_t *u = (internal_t *)mu;
+
+ mrp_log_info("> internal_close");
+
+ /* Is this client or server? If server, go remove the connection from
+ * servers table. */
+
+ if (u->bound) {
+ /* server listening socket */
+ mrp_htbl_remove(servers, u->name, FALSE);
+ u->bound = FALSE;
+ }
+
+ mrp_htbl_remove(connections, u->address.data, FALSE);
+
+ u->active = FALSE;
+
+ remove_messages(u);
+}
+
+
+static int internal_connect(mrp_transport_t *mu, mrp_sockaddr_t *addr,
+ socklen_t addrlen)
+{
+ internal_t *u = (internal_t *)mu;
+ internal_t *host;
+ mrp_transport_t *mt;
+
+ MRP_UNUSED(addrlen);
+
+ mrp_log_info("> internal_connect");
+
+ /* client connecting */
+
+ if (!servers) {
+ mrp_log_error("no servers available for connecting");
+ return FALSE;
+ }
+
+ host = mrp_htbl_lookup(servers, addr->data);
+
+ if (!host) {
+ mrp_log_error("server '%s' wasn't found", addr->data);
+ return FALSE;
+ }
+
+ mt = (mrp_transport_t *) host;
+
+ host->endpoint = u; /* temporary connection data */
+
+ host->evt.connection(mt, mt->user_data);
+
+ return TRUE;
+}
+
+
+static int internal_disconnect(mrp_transport_t *mu)
+{
+ internal_t *u = (internal_t *)mu;
+
+ mrp_log_info("> internal_disconnect");
+
+ if (u->connected) {
+ internal_t *endpoint = u->endpoint;
+
+ if (endpoint) {
+ endpoint->endpoint = NULL;
+ mrp_transport_disconnect((mrp_transport_t *) endpoint);
+ }
+ u->endpoint = NULL;
+ }
+
+ return TRUE;
+}
+
+
+static int internal_sendto(mrp_transport_t *mu, mrp_msg_t *data,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ internal_t *u = (internal_t *)mu;
+ void *buf;
+ size_t size;
+ internal_message_t *msg;
+
+
+ mrp_log_info("> internal_sendto");
+
+ size = mrp_msg_default_encode(data, &buf);
+
+ if (size == 0 || buf == NULL) {
+ return FALSE;
+ }
+
+ msg = mrp_allocz(sizeof(internal_message_t));
+
+ msg->addr = addr;
+ msg->addrlen = addrlen;
+ msg->data = buf;
+ msg->free_data = FALSE;
+ msg->offset = 0;
+ msg->size = size;
+ msg->u = u;
+
+ mrp_list_init(&msg->hook);
+ mrp_list_append(&msg_queue, &msg->hook);
+
+ mrp_enable_deferred(d);
+
+ return TRUE;
+}
+
+
+static int internal_send(mrp_transport_t *mu, mrp_msg_t *msg)
+{
+ mrp_log_info("> internal_send");
+
+ if (!mu->connected) {
+ return FALSE;
+ }
+
+ return internal_sendto(mu, msg, NULL, 0);
+}
+
+
+static int internal_sendrawto(mrp_transport_t *mu, void *data, size_t size,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ internal_t *u = (internal_t *)mu;
+ internal_message_t *msg;
+
+ mrp_log_info("> internal_sendrawto");
+
+ msg = mrp_allocz(sizeof(internal_message_t));
+
+ msg->addr = addr;
+ msg->addrlen = addrlen;
+ msg->data = data;
+ msg->free_data = FALSE;
+ msg->offset = 0;
+ msg->size = size;
+ msg->u = u;
+
+ mrp_list_init(&msg->hook);
+ mrp_list_append(&msg_queue, &msg->hook);
+
+ mrp_enable_deferred(d);
+
+ return TRUE;
+}
+
+
+static int internal_sendraw(mrp_transport_t *mu, void *data, size_t size)
+{
+ mrp_log_info("> internal_sendraw");
+
+ if (!mu->connected) {
+ return FALSE;
+ }
+
+ return internal_sendrawto(mu, data, size, NULL, 0);
+}
+
+
+static size_t encode_custom_data(void *data, void **newdata, uint16_t tag)
+{
+ mrp_data_descr_t *type = mrp_msg_find_type(tag);
+ uint32_t *lenp;
+ uint16_t *tagp;
+ size_t reserve, size;
+ int len;
+ void *buf;
+
+ if (type == NULL) {
+ mrp_log_error("type not found!");
+ return 0;
+ }
+
+ reserve = sizeof(*lenp) + sizeof(*tagp);
+ size = mrp_data_encode(&buf, data, type, reserve);
+
+ if (size == 0) {
+ mrp_log_error("data encoding failed");
+ return 0;
+ }
+
+ /* some format conversion */
+
+ lenp = buf;
+ len = size - sizeof(*lenp);
+ tagp = buf + sizeof(*lenp);
+
+ /* *lenp = htobe32(len); */ /* Not used */
+ *tagp = htobe16(tag);
+
+ *newdata = buf;
+
+ return len;
+}
+
+
+static int internal_senddatato(mrp_transport_t *mu, void *data, uint16_t tag,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ internal_t *u = (internal_t *)mu;
+ mrp_data_descr_t *type = mrp_msg_find_type(tag);
+ void *newdata = NULL;
+ size_t size;
+ internal_message_t *msg;
+
+ mrp_log_info("> internal_senddatato");
+
+ if (type == NULL)
+ return FALSE;
+
+ size = encode_custom_data(data, &newdata, tag);
+
+ if (!newdata) {
+ mrp_log_error("custom data encoding failed");
+ return FALSE;
+ }
+
+ msg = mrp_allocz(sizeof(internal_message_t));
+
+ msg->addr = addr;
+ msg->addrlen = addrlen;
+ msg->data = newdata;
+ msg->free_data = TRUE;
+ msg->offset = 4;
+ msg->size = size;
+ msg->u = u;
+
+ mrp_list_init(&msg->hook);
+ mrp_list_append(&msg_queue, &msg->hook);
+
+ mrp_enable_deferred(d);
+
+ return TRUE;
+}
+
+
+static int internal_senddata(mrp_transport_t *mu, void *data, uint16_t tag)
+{
+ mrp_log_info("> internal_senddata");
+
+ if (!mu->connected) {
+ return FALSE;
+ }
+
+ return internal_senddatato(mu, data, tag, NULL, 0);
+}
+
+
+
+
+MRP_REGISTER_TRANSPORT(internal, INTERNAL, internal_t, internal_resolve,
+ internal_open, NULL, internal_close,
+ internal_bind, internal_listen, internal_accept,
+ internal_connect, internal_disconnect,
+ internal_send, internal_sendto,
+ internal_sendraw, internal_sendrawto,
+ internal_senddata, internal_senddatato);
--- /dev/null
+/*
+ * Copyright (c) 2012, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Intel Corporation nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#define _GNU_SOURCE
+#include <getopt.h>
+
+#include <murphy/common.h>
+
+
+/*
+ * tags for generic message fields
+ */
+
+#define TAG_SEQ ((uint16_t)0x1)
+#define TAG_MSG ((uint16_t)0x2)
+#define TAG_U8 ((uint16_t)0x3)
+#define TAG_S8 ((uint16_t)0x4)
+#define TAG_U16 ((uint16_t)0x5)
+#define TAG_S16 ((uint16_t)0x6)
+#define TAG_DBL ((uint16_t)0x7)
+#define TAG_BLN ((uint16_t)0x8)
+#define TAG_ASTR ((uint16_t)0x9)
+#define TAG_AU32 ((uint16_t)0xa)
+#define TAG_RPL ((uint16_t)0xb)
+#define TAG_END MRP_MSG_FIELD_END
+
+#define U32_GUARD (uint32_t)-1
+
+/*
+ * our test custom data type
+ */
+
+#define TAG_CUSTOM 0x1
+
+typedef struct {
+ uint32_t seq;
+ char *msg;
+ uint8_t u8;
+ int8_t s8;
+ uint16_t u16;
+ int16_t s16;
+ double dbl;
+ bool bln;
+ char **astr;
+ uint32_t nstr;
+ uint32_t fsck;
+ uint32_t *au32;
+ char *rpl;
+} custom_t;
+
+
+MRP_DATA_DESCRIPTOR(custom_descr, TAG_CUSTOM, custom_t,
+ MRP_DATA_MEMBER(custom_t, seq, MRP_MSG_FIELD_UINT32),
+ MRP_DATA_MEMBER(custom_t, msg, MRP_MSG_FIELD_STRING),
+ MRP_DATA_MEMBER(custom_t, u8, MRP_MSG_FIELD_UINT8 ),
+ MRP_DATA_MEMBER(custom_t, s8, MRP_MSG_FIELD_SINT8 ),
+ MRP_DATA_MEMBER(custom_t, u16, MRP_MSG_FIELD_UINT16),
+ MRP_DATA_MEMBER(custom_t, s16, MRP_MSG_FIELD_SINT16),
+ MRP_DATA_MEMBER(custom_t, dbl, MRP_MSG_FIELD_DOUBLE),
+ MRP_DATA_MEMBER(custom_t, bln, MRP_MSG_FIELD_BOOL ),
+ MRP_DATA_MEMBER(custom_t, rpl, MRP_MSG_FIELD_STRING),
+ MRP_DATA_MEMBER(custom_t, nstr, MRP_MSG_FIELD_UINT32),
+ MRP_DATA_MEMBER(custom_t, fsck, MRP_MSG_FIELD_UINT32),
+ MRP_DATA_ARRAY_COUNT(custom_t, astr, nstr,
+ MRP_MSG_FIELD_STRING),
+ MRP_DATA_ARRAY_GUARD(custom_t, au32, u32, U32_GUARD,
+ MRP_MSG_FIELD_UINT32));
+
+MRP_DATA_DESCRIPTOR(buggy_descr, TAG_CUSTOM, custom_t,
+ MRP_DATA_MEMBER(custom_t, seq, MRP_MSG_FIELD_UINT32),
+ MRP_DATA_MEMBER(custom_t, msg, MRP_MSG_FIELD_STRING),
+ MRP_DATA_MEMBER(custom_t, u8, MRP_MSG_FIELD_UINT8 ),
+ MRP_DATA_MEMBER(custom_t, s8, MRP_MSG_FIELD_SINT8 ),
+ MRP_DATA_MEMBER(custom_t, u16, MRP_MSG_FIELD_UINT16),
+ MRP_DATA_MEMBER(custom_t, s16, MRP_MSG_FIELD_SINT16),
+ MRP_DATA_MEMBER(custom_t, dbl, MRP_MSG_FIELD_DOUBLE),
+ MRP_DATA_MEMBER(custom_t, bln, MRP_MSG_FIELD_BOOL ),
+ MRP_DATA_MEMBER(custom_t, rpl, MRP_MSG_FIELD_STRING),
+ MRP_DATA_MEMBER(custom_t, nstr, MRP_MSG_FIELD_UINT32),
+ MRP_DATA_MEMBER(custom_t, fsck, MRP_MSG_FIELD_UINT32),
+ MRP_DATA_ARRAY_COUNT(custom_t, astr, fsck,
+ MRP_MSG_FIELD_STRING),
+ MRP_DATA_ARRAY_GUARD(custom_t, au32, u32, U32_GUARD,
+ MRP_MSG_FIELD_UINT32));
+
+mrp_data_descr_t *data_descr;
+
+typedef struct {
+ mrp_mainloop_t *ml;
+ mrp_transport_t *lt, *st;
+ char *addrstr;
+ mrp_sockaddr_t addr;
+ socklen_t alen;
+ const char *atype;
+ int server;
+ int sock;
+ mrp_io_watch_t *iow;
+ mrp_timer_t *timer;
+ int custom;
+ int buggy;
+ int connect;
+ int stream;
+ int log_mask;
+ const char *log_target;
+ uint32_t seqno;
+ mrp_list_hook_t clients;
+} context_t;
+
+typedef struct {
+ int id;
+ mrp_transport_t *t;
+ context_t *c;
+ mrp_list_hook_t hook;
+} client_t;
+
+
+void recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
+void recvfrom_msg(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
+ socklen_t addrlen, void *user_data);
+
+void recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data);
+void recvfrom_custom(mrp_transport_t *t, void *data, uint16_t tag,
+ mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data);
+
+
+
+void dump_msg(mrp_msg_t *msg, FILE *fp)
+{
+ mrp_msg_dump(msg, fp);
+}
+
+
+void srv_recvfrom_msg(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
+ socklen_t addrlen, void *user_data)
+{
+ context_t *c = (context_t *)user_data;
+ mrp_msg_field_t *f;
+ uint32_t seq;
+ char buf[256];
+ int status;
+
+ mrp_log_info("received a message");
+ dump_msg(msg, stdout);
+
+ seq = 0;
+ if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+ if (f->type == MRP_MSG_FIELD_UINT32)
+ seq = f->u32;
+ }
+
+ snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+
+ if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+ TAG_END)) {
+ mrp_log_info("failed to append to received message");
+ exit(1);
+ }
+
+ if (c->connect)
+ status = mrp_transport_send(t, msg);
+ else
+ status = mrp_transport_sendto(t, msg, addr, addrlen);
+
+ if (status)
+ mrp_log_info("reply successfully sent");
+ else
+ mrp_log_error("failed to send reply");
+
+ /* message unreffed by transport layer */
+}
+
+
+void recvfrom_msg(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
+ socklen_t addrlen, void *user_data)
+{
+ MRP_UNUSED(t);
+ MRP_UNUSED(addr);
+ MRP_UNUSED(addrlen);
+ MRP_UNUSED(user_data);
+
+ mrp_log_info("client received a message");
+ dump_msg(msg, stdout);
+}
+
+
+void srv_recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+ return srv_recvfrom_msg(t, msg, NULL, 0, user_data);
+}
+
+
+void recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+ return recvfrom_msg(t, msg, NULL, 0, user_data);
+}
+
+
+void dump_custom(custom_t *msg, FILE *fp)
+{
+ uint32_t i;
+
+ mrp_data_dump(msg, data_descr, fp);
+ fprintf(fp, "{\n");
+ fprintf(fp, " seq = %u\n" , msg->seq);
+ fprintf(fp, " msg = '%s'\n", msg->msg);
+ fprintf(fp, " u8 = %u\n" , msg->u8);
+ fprintf(fp, " s8 = %d\n" , msg->s8);
+ fprintf(fp, " u16 = %u\n" , msg->u16);
+ fprintf(fp, " s16 = %d\n" , msg->s16);
+ fprintf(fp, " dbl = %f\n" , msg->dbl);
+ fprintf(fp, " bln = %s\n" , msg->bln ? "true" : "false");
+ fprintf(fp, " astr = (%u)\n", msg->nstr);
+ for (i = 0; i < msg->nstr; i++)
+ fprintf(fp, " %s\n", msg->astr[i]);
+ fprintf(fp, " au32 =\n");
+ for (i = 0; msg->au32[i] != U32_GUARD; i++)
+ fprintf(fp, " %u\n", msg->au32[i]);
+ fprintf(fp, " rpl = '%s'\n", msg->rpl);
+ fprintf(fp, "}\n");
+}
+
+
+void free_custom(custom_t *msg)
+{
+ mrp_data_free(msg, data_descr->tag);
+}
+
+
+
+void srv_recvfrom_custom(mrp_transport_t *t, void *data, uint16_t tag,
+ mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data)
+{
+ context_t *c = (context_t *)user_data;
+ custom_t *msg = (custom_t *)data;
+ custom_t rpl;
+ char buf[256];
+ uint32_t au32[] = { 9, 8, 7, 6, 5, -1 };
+ int status;
+
+ mrp_log_info("server received custom message of type 0x%x", tag);
+ dump_custom(data, stdout);
+
+ if (tag != data_descr->tag) {
+ mrp_log_error("Tag 0x%x != our custom type (0x%x).",
+ tag, data_descr->tag);
+ exit(1);
+ }
+
+ rpl = *msg;
+ snprintf(buf, sizeof(buf), "reply to message #%u", msg->seq);
+ rpl.rpl = buf;
+ rpl.au32 = au32;
+
+ if (c->connect)
+ status = mrp_transport_senddata(t, &rpl, data_descr->tag);
+ else
+ status = mrp_transport_senddatato(t, &rpl, data_descr->tag,
+ addr, addrlen);
+ if (status)
+ mrp_log_info("reply successfully sent");
+ else
+ mrp_log_error("failed to send reply");
+
+ free_custom(msg);
+}
+
+void recvfrom_custom(mrp_transport_t *t, void *data, uint16_t tag,
+ mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data)
+{
+ custom_t *msg = (custom_t *)data;
+
+ MRP_UNUSED(t);
+ MRP_UNUSED(data);
+ MRP_UNUSED(addr);
+ MRP_UNUSED(addrlen);
+ MRP_UNUSED(user_data);
+
+ mrp_log_info("received custom message of type 0x%x", tag);
+ dump_custom(data, stdout);
+
+ if (tag != data_descr->tag) {
+ mrp_log_error("Tag 0x%x != our custom type (0x%x).",
+ tag, data_descr->tag);
+ exit(1);
+ }
+
+ free_custom(msg);
+}
+
+
+void recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data)
+{
+ recvfrom_custom(t, data, tag, NULL, 0, user_data);
+}
+
+void srv_recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data)
+{
+ srv_recvfrom_custom(t, data, tag, NULL, 0, user_data);
+}
+
+void closed_evt(mrp_transport_t *t, int error, void *user_data)
+{
+ context_t *c = (context_t *)user_data;
+
+ MRP_UNUSED(t);
+ MRP_UNUSED(c);
+
+ if (error) {
+ mrp_log_error("Connection closed with error %d (%s).", error,
+ strerror(error));
+ exit(1);
+ }
+ else {
+ mrp_log_info("Peer has closed the connection.");
+ exit(0);
+ }
+}
+
+
+void connection_evt(mrp_transport_t *lt, void *user_data)
+{
+ context_t *c = (context_t *)user_data;
+ int flags;
+
+ mrp_log_info("connection event!");
+
+ flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
+ c->st = mrp_transport_accept(lt, c, flags);
+
+ if (c->st == NULL) {
+ mrp_log_error("Failed to accept new connection.");
+ exit(1);
+ }
+}
+
+
+void type_init(context_t *c)
+{
+ if (c->buggy && c->server) {
+ data_descr = &buggy_descr;
+ mrp_log_info("Deliberately using buggy data descriptor...");
+ }
+ else
+ data_descr = &custom_descr;
+
+ if (!mrp_msg_register_type(data_descr)) {
+ mrp_log_error("Failed to register custom data type.");
+ exit(1);
+ }
+}
+
+
+void server_init(context_t *c)
+{
+ static mrp_transport_evt_t evt = {
+ { .recvmsg = NULL },
+ { .recvmsgfrom = NULL },
+ .closed = closed_evt,
+ .connection = connection_evt
+ };
+
+ int flags;
+
+ if (c->custom) {
+ evt.recvdata = srv_recv_custom;
+ evt.recvdatafrom = srv_recvfrom_custom;
+ }
+ else {
+ evt.recvmsg = srv_recv_msg;
+ evt.recvmsgfrom = srv_recvfrom_msg;
+ }
+
+
+ flags = MRP_TRANSPORT_REUSEADDR |
+ (c->custom ? MRP_TRANSPORT_MODE_CUSTOM : 0);
+ c->lt = mrp_transport_create(c->ml, c->atype, &evt, c, flags);
+
+ if (c->lt == NULL) {
+ mrp_log_error("Failed to create listening server transport.");
+ exit(1);
+ }
+
+ if (!mrp_transport_bind(c->lt, &c->addr, c->alen)) {
+ mrp_log_error("Failed to bind transport to address %s.", c->addrstr);
+ exit(1);
+ }
+
+ if (c->stream) {
+ if (!mrp_transport_listen(c->lt, 0)) {
+ mrp_log_error("Failed to listen on server transport.");
+ exit(1);
+ }
+ }
+}
+
+
+void send_msg(client_t *client)
+{
+ mrp_msg_t *msg;
+ uint32_t seq;
+ char buf[256];
+ char *astr[] = { "this", "is", "an", "array", "of", "strings" };
+ uint32_t au32[] = { 1, 2, 3,
+ 1 << 16, 2 << 16, 3 << 16,
+ 1 << 24, 2 << 24, 3 << 24 };
+ uint32_t nstr = MRP_ARRAY_SIZE(astr);
+ uint32_t nu32 = MRP_ARRAY_SIZE(au32);
+ int status;
+ context_t *c = client->c;
+
+ seq = c->seqno++;
+ snprintf(buf, sizeof(buf), "this is message #%u", (unsigned int)seq);
+
+ msg = mrp_msg_create(TAG_SEQ , MRP_MSG_FIELD_UINT32, seq,
+ TAG_MSG , MRP_MSG_FIELD_STRING, buf,
+ TAG_U8 , MRP_MSG_FIELD_UINT8 , seq & 0xf,
+ TAG_S8 , MRP_MSG_FIELD_SINT8 , -(seq & 0xf),
+ TAG_U16 , MRP_MSG_FIELD_UINT16, seq,
+ TAG_S16 , MRP_MSG_FIELD_SINT16, - seq,
+ TAG_DBL , MRP_MSG_FIELD_DOUBLE, seq / 3.0,
+ TAG_BLN , MRP_MSG_FIELD_BOOL , seq & 0x1,
+ TAG_ASTR, MRP_MSG_FIELD_ARRAY_OF(STRING), nstr, astr,
+ TAG_AU32, MRP_MSG_FIELD_ARRAY_OF(UINT32), nu32, au32,
+ TAG_END);
+
+ if (msg == NULL) {
+ mrp_log_error("Failed to create new message.");
+ exit(1);
+ }
+
+ if (c->connect)
+ status = mrp_transport_send(client->t, msg);
+ else
+ status = mrp_transport_sendto(client->t, msg, &c->addr, c->alen);
+
+ if (!status) {
+ mrp_log_error("Failed to send message #%d.", seq);
+ exit(1);
+ }
+ else
+ mrp_log_info("Message #%d succesfully sent.", seq);
+
+ mrp_msg_unref(msg);
+}
+
+
+void send_custom(client_t *client)
+{
+ custom_t msg;
+ char buf[256];
+ char *astr[] = { "this", "is", "a", "test", "string", "array" };
+ uint32_t au32[] = { 1, 2, 3, 4, 5, 6, 7, -1 };
+ int status;
+ context_t *c = client->c;
+ uint32_t seq = c->seqno++;
+
+ msg.seq = seq;
+ snprintf(buf, sizeof(buf), "this is message #%u", (unsigned int)seq);
+ msg.msg = buf;
+ msg.u8 = seq & 0xf;
+ msg.s8 = -(seq & 0xf);
+ msg.u16 = seq;
+ msg.s16 = - seq;
+ msg.dbl = seq / 3.0;
+ msg.bln = seq & 0x1;
+ msg.astr = astr;
+ msg.nstr = MRP_ARRAY_SIZE(astr);
+ msg.fsck = 1000;
+ msg.au32 = au32;
+ msg.rpl = "";
+
+ if (c->connect)
+ status = mrp_transport_senddata(client->t, &msg, data_descr->tag);
+ else
+ status = mrp_transport_senddatato(client->t, &msg, data_descr->tag,
+ &c->addr, c->alen);
+
+ if (!status) {
+ mrp_log_error("Failed to send message #%d.", msg.seq);
+ exit(1);
+ }
+ else
+ mrp_log_info("Message #%d succesfully sent.", msg.seq);
+}
+
+
+
+void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
+{
+ client_t *client = (client_t *)user_data;
+ context_t *c = client->c;
+
+ MRP_UNUSED(ml);
+ MRP_UNUSED(t);
+
+ if (c->custom)
+ send_custom(client);
+ else
+ send_msg(client);
+}
+
+
+void client_init(context_t *c)
+{
+ static mrp_transport_evt_t evt = {
+ { .recvmsg = NULL },
+ { .recvmsgfrom = NULL },
+ .closed = closed_evt,
+ .connection = NULL
+ };
+
+ int flags;
+ client_t *client;
+
+ if (c->custom) {
+ evt.recvdata = recv_custom;
+ evt.recvdatafrom = recvfrom_custom;
+ }
+ else {
+ evt.recvmsg = recv_msg;
+ evt.recvmsgfrom = recvfrom_msg;
+ }
+
+ client = mrp_allocz(sizeof(client_t));
+ mrp_list_init(&client->hook);
+ client->c = c;
+
+ mrp_list_append(&c->clients, &client->hook);
+
+ flags = c->custom ? MRP_TRANSPORT_MODE_CUSTOM : 0;
+ client->t = mrp_transport_create(c->ml, c->atype, &evt, client, flags);
+
+ if (client->t == NULL) {
+ mrp_log_error("Failed to create new transport.");
+ exit(1);
+ }
+
+ if (!strcmp(c->atype, "unxd")) {
+ char addrstr[] = "unxd:@stream-test-client";
+ mrp_sockaddr_t addr;
+ socklen_t alen;
+
+ alen = mrp_transport_resolve(NULL, addrstr, &addr, sizeof(addr), NULL);
+ if (alen <= 0) {
+ mrp_log_error("Failed to resolve transport address '%s'.", addrstr);
+ exit(1);
+ }
+
+ if (!mrp_transport_bind(client->t, &addr, alen)) {
+ mrp_log_error("Failed to bind to transport address '%s'.", addrstr);
+ exit(1);
+ }
+ }
+
+ if (c->connect) {
+ if (!mrp_transport_connect(client->t, &c->addr, c->alen)) {
+ mrp_log_error("Failed to connect to %s.", c->addrstr);
+ exit(1);
+ }
+ }
+
+
+ c->timer = mrp_add_timer(c->ml, 1000, send_cb, client);
+
+ if (c->timer == NULL) {
+ mrp_log_error("Failed to create send timer.");
+ exit(1);
+ }
+}
+
+
+static void print_usage(const char *argv0, int exit_code, const char *fmt, ...)
+{
+ va_list ap;
+
+ if (fmt && *fmt) {
+ va_start(ap, fmt);
+ vprintf(fmt, ap);
+ va_end(ap);
+ }
+
+ printf("usage: %s [options] [transport-address]\n\n"
+ "The possible options are:\n"
+ " -s, --server run as test server (default)\n"
+ " -C, --connect connect transport\n"
+ " For connection-oriented transports, this is automatic.\n"
+ " -a, --address address to use\n"
+ " -c, --custom use custom messages\n"
+ " -m, --message use generic messages (default)\n"
+ " -b, --buggy use buggy data descriptors\n"
+ " -t, --log-target=TARGET log target to use\n"
+ " TARGET is one of stderr,stdout,syslog, or a logfile path\n"
+ " -l, --log-level=LEVELS logging level to use\n"
+ " LEVELS is a comma separated list of info, error and warning\n"
+ " -v, --verbose increase logging verbosity\n"
+ " -d, --debug enable debug messages\n"
+ " -h, --help show help on usage\n",
+ argv0);
+
+ if (exit_code < 0)
+ return;
+ else
+ exit(exit_code);
+}
+
+
+static void config_set_defaults(context_t *ctx)
+{
+ mrp_clear(ctx);
+ ctx->addrstr = "tcp4:127.0.0.1:3000";
+ ctx->server = FALSE;
+ ctx->custom = FALSE;
+ ctx->log_mask = MRP_LOG_UPTO(MRP_LOG_DEBUG);
+ ctx->log_target = MRP_LOG_TO_STDERR;
+}
+
+
+int parse_cmdline(context_t *ctx, int argc, char **argv)
+{
+# define OPTIONS "scmbCa:l:t:vdh"
+ struct option options[] = {
+ { "server" , no_argument , NULL, 's' },
+ { "address" , required_argument, NULL, 'a' },
+ { "custom" , no_argument , NULL, 'c' },
+ { "connect" , no_argument , NULL, 'C' },
+ { "message" , no_argument , NULL, 'm' },
+ { "buggy" , no_argument , NULL, 'b' },
+ { "log-level" , required_argument, NULL, 'l' },
+ { "log-target", required_argument, NULL, 't' },
+ { "verbose" , optional_argument, NULL, 'v' },
+ { "debug" , no_argument , NULL, 'd' },
+ { "help" , no_argument , NULL, 'h' },
+ { NULL, 0, NULL, 0 }
+ };
+
+ int opt, debug;
+
+ debug = FALSE;
+ config_set_defaults(ctx);
+
+ while ((opt = getopt_long(argc, argv, OPTIONS, options, NULL)) != -1) {
+ switch (opt) {
+ case 's':
+ ctx->server = TRUE;
+ break;
+
+ case 'c':
+ ctx->custom = TRUE;
+ break;
+
+ case 'm':
+ ctx->custom = FALSE;
+ break;
+
+ case 'b':
+ ctx->buggy = TRUE;
+ break;
+
+ case 'C':
+ ctx->connect = TRUE;
+ break;
+
+ case 'a':
+ ctx->addrstr = optarg;
+ break;
+
+ case 'v':
+ ctx->log_mask <<= 1;
+ ctx->log_mask |= 1;
+ break;
+
+ case 'l':
+ ctx->log_mask = mrp_log_parse_levels(optarg);
+ if (ctx->log_mask < 0)
+ print_usage(argv[0], EINVAL, "invalid log level '%s'", optarg);
+ break;
+
+ case 't':
+ ctx->log_target = mrp_log_parse_target(optarg);
+ if (!ctx->log_target)
+ print_usage(argv[0], EINVAL, "invalid log target '%s'", optarg);
+ break;
+
+ case 'd':
+ debug = TRUE;
+ break;
+
+ case 'h':
+ print_usage(argv[0], -1, "");
+ exit(0);
+ break;
+
+ default:
+ print_usage(argv[0], EINVAL, "invalid option '%c'", opt);
+ }
+ }
+
+ if (debug)
+ ctx->log_mask |= MRP_LOG_MASK_DEBUG;
+
+ return TRUE;
+}
+
+
+int main(int argc, char *argv[])
+{
+ context_t c;
+
+ mrp_clear(&c);
+
+ if (!parse_cmdline(&c, argc, argv))
+ exit(1);
+
+ mrp_log_set_mask(c.log_mask);
+ mrp_log_set_target(c.log_target);
+
+ mrp_log_info("Using address '%s'...", c.addrstr);
+
+ mrp_list_init(&c.clients);
+
+ if (c.custom)
+ mrp_log_info("Using custom messages...");
+ else
+ mrp_log_info("Using generic messages...");
+
+ if (!strncmp(c.addrstr, "tcp", 3) ||
+ !strncmp(c.addrstr, "unxs", 4)) {
+ c.stream = TRUE;
+ c.connect = TRUE;
+ }
+
+ c.alen = mrp_transport_resolve(NULL, c.addrstr,
+ &c.addr, sizeof(c.addr), &c.atype);
+ if (c.alen <= 0) {
+ mrp_log_error("Failed to resolve transport address '%s'.", c.addrstr);
+ exit(1);
+ }
+
+ c.ml = mrp_mainloop_create();
+
+ type_init(&c);
+
+ server_init(&c);
+
+ client_init(&c);
+ client_init(&c);
+
+ mrp_mainloop_run(c.ml);
+
+ return 0;
+}