# set up defaults
PATTERN="^mrp_" # export everything prefixed with mrp_
-IGNORE="MRP_PRINTF_LIKE" # ignore these symbols/macros
+IGNORE="MRP_PRINTF_LIKE,MRP_NULLTERM" # ignore these symbols/macros
IT="," # ignore-list is comma-separated
SOURCES="" # no default input, must be specified
OUTPUT="" # no default output, must be specified
typedef struct {
MRP_TRANSPORT_PUBLIC_FIELDS; /* common transport fields */
int sock; /* UDP socket */
- int flags; /* socket flags */
int family; /* socket family */
mrp_io_watch_t *iow; /* socket I/O watch */
void *ibuf; /* input buffer */
}
-static int dgrm_open(mrp_transport_t *mu, int flags)
+static int dgrm_open(mrp_transport_t *mu)
{
dgrm_t *u = (dgrm_t *)mu;
u->sock = -1;
u->family = -1;
- u->flags = flags;
return TRUE;
}
-static int dgrm_create(mrp_transport_t *mu, void *conn, int flags)
+static int dgrm_create(mrp_transport_t *mu, void *conn)
{
dgrm_t *u = (dgrm_t *)mu;
int on;
mrp_io_event_t events;
- u->sock = *(int *)conn;
- u->flags = flags;
+ u->sock = *(int *)conn;
if (u->sock >= 0) {
- if (u->flags & MRP_TRANSPORT_REUSEADDR) {
+ if (mu->flags & MRP_TRANSPORT_REUSEADDR) {
on = 1;
setsockopt(u->sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
}
- if (u->flags & MRP_TRANSPORT_NONBLOCK) {
+ if (mu->flags & MRP_TRANSPORT_NONBLOCK) {
on = 1;
fcntl(u->sock, F_SETFL, O_NONBLOCK, on);
}
ssize_t n;
void *data;
int old, error;
- mrp_msg_t *msg;
MRP_UNUSED(ml);
MRP_UNUSED(w);
goto fatal_error;
}
- data = u->ibuf + sizeof(size);
- msg = mrp_msg_default_decode(data, size);
-
- if (msg != NULL) {
- if (mu->connected) {
- MRP_TRANSPORT_BUSY(mu, {
- mu->evt.recv(mu, msg, mu->user_data);
- });
- }
- else {
- MRP_TRANSPORT_BUSY(mu, {
- mu->evt.recvfrom(mu, msg, &addr, addrlen,
- mu->user_data);
- });
- }
+ data = u->ibuf + sizeof(size);
+ error = mu->recv_data(mu, data, size, &addr, addrlen);
- mrp_msg_unref(msg);
-
- if (u->check_destroy(mu))
- return;
- }
- else {
- error = EPROTO;
+ if (error)
goto fatal_error;
- }
+
+ if (u->check_destroy(mu))
+ return;
}
if (events & MRP_IO_EVENT_HUP) {
}
+static int dgrm_sendraw(mrp_transport_t *mu, void *data, size_t size)
+{
+ dgrm_t *u = (dgrm_t *)mu;
+ ssize_t n;
+
+ if (u->connected) {
+ n = write(u->sock, data, size);
+
+ if (n == (ssize_t)size)
+ return TRUE;
+ else {
+ if (n == -1 && errno == EAGAIN) {
+ mrp_log_error("%s(): XXX TODO: this sucks, need to add "
+ "output queuing for dgrm-transport.",
+ __FUNCTION__);
+ }
+ }
+ }
+
+ return FALSE;
+}
+
+
+static int dgrm_sendrawto(mrp_transport_t *mu, void *data, size_t size,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ dgrm_t *u = (dgrm_t *)mu;
+ ssize_t n;
+
+ if (MRP_UNLIKELY(u->sock == -1)) {
+ if (!open_socket(u, ((struct sockaddr *)addr)->sa_family))
+ return FALSE;
+ }
+
+ n = sendto(u->sock, data, size, 0, &addr->any, addrlen);
+
+ if (n == (ssize_t)size)
+ return TRUE;
+ else {
+ if (n == -1 && errno == EAGAIN) {
+ mrp_log_error("%s(): XXX TODO: dgrm-transport send failed",
+ __FUNCTION__);
+ }
+ }
+
+ return FALSE;
+}
+
+
+static int senddatato(mrp_transport_t *mu, void *data, uint16_t tag,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ dgrm_t *u = (dgrm_t *)mu;
+ mrp_data_descr_t *type;
+ ssize_t n;
+ void *buf;
+ size_t size, reserve, len;
+ uint32_t *lenp;
+ uint16_t *tagp;
+
+ if (MRP_UNLIKELY(u->sock == -1)) {
+ if (!open_socket(u, ((struct sockaddr *)addr)->sa_family))
+ return FALSE;
+ }
+
+ type = mrp_msg_find_type(tag);
+
+ if (type != NULL) {
+ reserve = sizeof(*lenp) + sizeof(*tagp);
+ size = mrp_data_encode(&buf, data, type, reserve);
+
+ if (size > 0) {
+ lenp = buf;
+ len = size - sizeof(*lenp);
+ tagp = buf + sizeof(*lenp);
+ *lenp = htobe32(len);
+ *tagp = htobe16(tag);
+
+ if (u->connected)
+ n = send(u->sock, buf, len + sizeof(*lenp), 0);
+ else
+ n = sendto(u->sock, buf, len + sizeof(*lenp), 0, &addr->any, addrlen);
+
+ mrp_free(buf);
+
+ if (n == (ssize_t)(len + sizeof(*lenp)))
+ return TRUE;
+ else {
+ if (n == -1 && errno == EAGAIN) {
+ mrp_log_error("%s(): XXX TODO: dgrm-transport send"
+ " needs queuing", __FUNCTION__);
+ }
+ }
+ }
+ }
+
+ return FALSE;
+}
+
+
+static int dgrm_senddata(mrp_transport_t *mu, void *data, uint16_t tag)
+{
+ if (mu->connected)
+ return senddatato(mu, data, tag, NULL, 0);
+ else
+ return FALSE;
+}
+
+
+static int dgrm_senddatato(mrp_transport_t *mu, void *data, uint16_t tag,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ return senddatato(mu, data, tag, addr, addrlen);
+}
+
+
MRP_REGISTER_TRANSPORT(udp4, "udp4", dgrm_t, dgrm_resolve,
dgrm_open, dgrm_create, dgrm_close,
dgrm_bind, dgrm_listen, NULL,
dgrm_connect, dgrm_disconnect,
- dgrm_send, dgrm_sendto);
+ dgrm_send, dgrm_sendto,
+ dgrm_sendraw, dgrm_sendrawto,
+ dgrm_senddata, dgrm_senddatato);
MRP_REGISTER_TRANSPORT(udp6, "udp6", dgrm_t, dgrm_resolve,
dgrm_open, dgrm_create, dgrm_close,
dgrm_bind, dgrm_listen, NULL,
dgrm_connect, dgrm_disconnect,
- dgrm_send, dgrm_sendto);
+ dgrm_send, dgrm_sendto,
+ dgrm_sendraw, dgrm_sendrawto,
+ dgrm_senddata, dgrm_senddatato);
MRP_REGISTER_TRANSPORT(unxdgrm, "unxdgrm", dgrm_t, dgrm_resolve,
dgrm_open, dgrm_create, dgrm_close,
dgrm_bind, dgrm_listen, NULL,
dgrm_connect, dgrm_disconnect,
- dgrm_send, dgrm_sendto);
+ dgrm_send, dgrm_sendto,
+ dgrm_sendraw, dgrm_sendrawto,
+ dgrm_senddata, dgrm_senddatato);
#include <murphy/common/list.h>
#include <murphy/common/msg.h>
+/*
+ * types below NDIRECT_TYPE are directly indexed
+ */
+#define NDIRECT_TYPE 256
+static mrp_data_descr_t **direct_types;
+static mrp_data_descr_t **other_types;
+static int nother_type;
+
+
+static inline void destroy_field(mrp_msg_field_t *f)
+{
+ uint32_t i;
+
+ if (f != NULL) {
+ mrp_list_delete(&f->hook);
+
+ switch (f->type) {
+ case MRP_MSG_FIELD_STRING:
+ mrp_free(f->str);
+ break;
+
+ case MRP_MSG_FIELD_BLOB:
+ mrp_free(f->blb);
+ break;
+
+ default:
+ if (f->type & MRP_MSG_FIELD_ARRAY) {
+ if ((f->type & ~MRP_MSG_FIELD_ARRAY) == MRP_MSG_FIELD_STRING) {
+ for (i = 0; i < f->size[0]; i++) {
+ mrp_free(f->astr[i]);
+ }
+ }
+
+ mrp_free(f->aany);
+ }
+ break;
+ }
+
+ mrp_free(f);
+ }
+}
+
static inline mrp_msg_field_t *create_field(uint16_t tag, va_list *ap)
{
mrp_msg_field_t *f;
- uint16_t type;
+ uint16_t type, base;
uint32_t size;
void *blb;
type = va_arg(*ap, uint32_t);
#define CREATE(_f, _tag, _type, _fldtype, _fld, _last, _errlbl) do { \
+ \
(_f) = mrp_allocz(MRP_OFFSET(typeof(*_f), _last) + \
sizeof(_f->_last)); \
\
} \
} while (0)
+#define CREATE_ARRAY(_f, _tag, _type, _fld, _fldtype, _errlbl) do { \
+ uint16_t _base; \
+ uint32_t _i; \
+ \
+ (_f) = mrp_allocz(MRP_OFFSET(typeof(*_f), _fld) + \
+ sizeof(_f->size[0])); \
+ \
+ if ((_f) != NULL) { \
+ (_f)->tag = _tag; \
+ (_f)->type = _type | MRP_MSG_FIELD_ARRAY; \
+ _base = _type & ~MRP_MSG_FIELD_ARRAY; \
+ \
+ _f->size[0] = va_arg(ap, uint32_t); \
+ _f->_fld = mrp_allocz_array(typeof(*_f->_fld), \
+ _f->size[0]); \
+ \
+ if (_f->_fld == NULL) \
+ goto _errlbl; \
+ else \
+ memcpy(_f->_fld, va_arg(ap, typeof(_f->_fld)), \
+ _f->size[0] * sizeof(_f->_fld[0])); \
+ \
+ if (_base == MRP_MSG_FIELD_STRING) { \
+ for (_i = 0; _i < _f->size[0]; _i++) { \
+ _f->astr[_i] = mrp_strdup(_f->astr[_i]); \
+ if (_f->astr[_i] == NULL) \
+ goto _errlbl; \
+ } \
+ } \
+ } \
+ else \
+ goto _errlbl; \
+ } while (0)
+
+ f = NULL;
+
switch (type) {
case MRP_MSG_FIELD_STRING:
- CREATE(f, tag, type, char *, str, str, nomem);
+ CREATE(f, tag, type, char *, str, str, fail);
f->str = mrp_strdup(f->str);
if (f->str == NULL)
- goto nomem;
+ goto fail;
break;
case MRP_MSG_FIELD_BOOL:
- CREATE(f, tag, type, int, bln, bln, nomem);
+ CREATE(f, tag, type, int, bln, bln, fail);
break;
case MRP_MSG_FIELD_UINT8:
- CREATE(f, tag, type, unsigned int, u8, u8, nomem);
+ CREATE(f, tag, type, unsigned int, u8, u8, fail);
break;
case MRP_MSG_FIELD_SINT8:
- CREATE(f, tag, type, signed int, s8, s8, nomem);
+ CREATE(f, tag, type, signed int, s8, s8, fail);
break;
case MRP_MSG_FIELD_UINT16:
- CREATE(f, tag, type, unsigned int, u16, u16, nomem);
+ CREATE(f, tag, type, unsigned int, u16, u16, fail);
break;
case MRP_MSG_FIELD_SINT16:
- CREATE(f, tag, type, signed int, s16, s16, nomem);
+ CREATE(f, tag, type, signed int, s16, s16, fail);
break;
case MRP_MSG_FIELD_UINT32:
- CREATE(f, tag, type, unsigned int, u32, u32, nomem);
+ CREATE(f, tag, type, unsigned int, u32, u32, fail);
break;
case MRP_MSG_FIELD_SINT32:
- CREATE(f, tag, type, signed int, s32, s32, nomem);
+ CREATE(f, tag, type, signed int, s32, s32, fail);
break;
case MRP_MSG_FIELD_UINT64:
- CREATE(f, tag, type, uint64_t, u64, u64, nomem);
+ CREATE(f, tag, type, uint64_t, u64, u64, fail);
break;
case MRP_MSG_FIELD_SINT64:
- CREATE(f, tag, type, int64_t, s64, s64, nomem);
+ CREATE(f, tag, type, int64_t, s64, s64, fail);
break;
case MRP_MSG_FIELD_DOUBLE:
- CREATE(f, tag, type, double, dbl, dbl, nomem);
+ CREATE(f, tag, type, double, dbl, dbl, fail);
break;
case MRP_MSG_FIELD_BLOB:
size = va_arg(ap, uint32_t);
- CREATE(f, tag, type, void *, blb, size[0], nomem);
+ CREATE(f, tag, type, void *, blb, size[0], fail);
blb = f->blb;
f->size[0] = size;
f->size[0] = size;
}
else
- goto nomem;
+ goto fail;
break;
default:
- if (f->type & MRP_MSG_FIELD_ARRAY) {
- errno = EOPNOTSUPP;
- mrp_log_error("XXX TODO: MRP_MSG_FIELD_ARRAY not implemented");
- }
- else
+ if (!(type & MRP_MSG_FIELD_ARRAY)) {
errno = EINVAL;
+ goto fail;
+ }
- mrp_free(f);
- f = NULL;
+ base = type & ~MRP_MSG_FIELD_ARRAY;
+
+ switch (base) {
+ case MRP_MSG_FIELD_STRING:
+ CREATE_ARRAY(f, tag, base, astr, char *, fail);
+ break;
+ case MRP_MSG_FIELD_BOOL:
+ CREATE_ARRAY(f, tag, base, abln, int, fail);
+ break;
+ case MRP_MSG_FIELD_UINT8:
+ CREATE_ARRAY(f, tag, base, au8, unsigned int, fail);
+ break;
+ case MRP_MSG_FIELD_SINT8:
+ CREATE_ARRAY(f, tag, base, as8, int, fail);
+ break;
+ case MRP_MSG_FIELD_UINT16:
+ CREATE_ARRAY(f, tag, base, au16, unsigned int, fail);
+ break;
+ case MRP_MSG_FIELD_SINT16:
+ CREATE_ARRAY(f, tag, base, as16, int, fail);
+ break;
+ case MRP_MSG_FIELD_UINT32:
+ CREATE_ARRAY(f, tag, base, au32, unsigned int, fail);
+ break;
+ case MRP_MSG_FIELD_SINT32:
+ CREATE_ARRAY(f, tag, base, as32, int, fail);
+ break;
+ case MRP_MSG_FIELD_UINT64:
+ CREATE_ARRAY(f, tag, base, au64, unsigned long long, fail);
+ break;
+ case MRP_MSG_FIELD_SINT64:
+ CREATE_ARRAY(f, tag, base, as64, long long, fail);
+ break;
+ case MRP_MSG_FIELD_DOUBLE:
+ CREATE_ARRAY(f, tag, base, adbl, double, fail);
+ break;
+ default:
+ errno = EINVAL;
+ goto fail;
+ }
+ break;
}
-#undef CREATE
}
return f;
- nomem:
- errno = ENOMEM;
+ fail:
+ destroy_field(f);
return NULL;
+
+#undef CREATE
+#undef CREATE_ARRAY
}
if (msg != NULL) {
mrp_list_foreach(&msg->fields, p, n) {
f = mrp_list_entry(p, typeof(*f), hook);
- mrp_list_delete(&f->hook);
-
- switch (f->type) {
- case MRP_MSG_FIELD_STRING:
- mrp_free(f->str);
- break;
- case MRP_MSG_FIELD_BLOB:
- mrp_free(f->blb);
- break;
- }
-
- mrp_free(f);
+ destroy_field(f);
}
}
}
}
+static const char *field_type_name(uint16_t type)
+{
+#define BASIC(t, n) [MRP_MSG_FIELD_##t] = n
+#define ARRAY(t, n) [MRP_MSG_FIELD_##t] = "array of "n"s"
+ static const char *basic[] = {
+ BASIC(STRING, "string" ),
+ BASIC(BOOL , "boolean"),
+ BASIC(UINT8 , "uint8" ),
+ BASIC(SINT8 , "sint8" ),
+ BASIC(UINT16, "uint16" ),
+ BASIC(SINT16, "sint16" ),
+ BASIC(UINT32, "uint32" ),
+ BASIC(SINT32, "sint32" ),
+ BASIC(UINT64, "uint64" ),
+ BASIC(SINT64, "sint64" ),
+ BASIC(DOUBLE, "double" ),
+ BASIC(BLOB , "blob" )
+ };
+
+ static const char *array[] = {
+ ARRAY(STRING, "string" ),
+ ARRAY(BOOL , "boolean"),
+ ARRAY(UINT8 , "uint8" ),
+ ARRAY(SINT8 , "sint8" ),
+ ARRAY(UINT16, "uint16" ),
+ ARRAY(SINT16, "sint16" ),
+ ARRAY(UINT32, "uint32" ),
+ ARRAY(SINT32, "sint32" ),
+ ARRAY(UINT64, "uint64" ),
+ ARRAY(SINT64, "sint64" ),
+ ARRAY(DOUBLE, "double" ),
+ ARRAY(BLOB , "blob" )
+ };
+#undef BASIC
+#undef ARRAY
+
+ uint16_t base;
+
+ if (MRP_MSG_FIELD_INVALID < type && type <= MRP_MSG_FIELD_MAX)
+ return basic[type];
+ else {
+ if (type & MRP_MSG_FIELD_ARRAY) {
+ base = type & ~MRP_MSG_FIELD_ARRAY;
+
+ if (MRP_MSG_FIELD_INVALID < base && base <= MRP_MSG_FIELD_MAX)
+ return array[base];
+ }
+ }
+
+ return "unknown type";
+}
+
+
int mrp_msg_dump(mrp_msg_t *msg, FILE *fp)
{
mrp_msg_field_t *f;
mrp_list_hook_t *p, *n;
int l;
+ uint32_t i;
+ uint16_t base;
+ const char *tname;
l = fprintf(fp, "{\n");
mrp_list_foreach(&msg->fields, p, n) {
l += fprintf(fp, " 0x%x ", f->tag);
-#define DUMP(_fmt, _type, _val) \
- l += fprintf(fp, "= <%s> "_fmt"\n", _type, _val)
+#define DUMP(_indent, _fmt, _typename, _val) \
+ l += fprintf(fp, "%*.*s= <%s> "_fmt"\n", _indent, _indent, "", \
+ _typename, _val)
+ tname = field_type_name(f->type);
switch (f->type) {
case MRP_MSG_FIELD_STRING:
- DUMP("'%s'", "string", f->str);
+ DUMP(0, "'%s'", tname, f->str);
break;
case MRP_MSG_FIELD_BOOL:
- DUMP("%s", "boolean", f->bln ? "true" : "false");
+ DUMP(0, "%s", tname, f->bln ? "true" : "false");
break;
case MRP_MSG_FIELD_UINT8:
- DUMP("%u", "uint8", f->u8);
+ DUMP(0, "%u", tname, f->u8);
break;
case MRP_MSG_FIELD_SINT8:
- DUMP("%d", "sint8", f->s8);
+ DUMP(0, "%d", tname, f->s8);
break;
case MRP_MSG_FIELD_UINT16:
- DUMP("%u", "uint16", f->u16);
+ DUMP(0, "%u", tname, f->u16);
break;
case MRP_MSG_FIELD_SINT16:
- DUMP("%d", "sint16", f->s16);
+ DUMP(0, "%d", tname, f->s16);
break;
case MRP_MSG_FIELD_UINT32:
- DUMP("%u", "uint32", f->u32);
+ DUMP(0, "%u", tname, f->u32);
break;
case MRP_MSG_FIELD_SINT32:
- DUMP("%d", "sint32", f->s32);
+ DUMP(0, "%d", tname, f->s32);
break;
case MRP_MSG_FIELD_UINT64:
- DUMP("%Lu", "uint64", (long long unsigned)f->u64);
+ DUMP(0, "%Lu", tname, (long long unsigned)f->u64);
break;
case MRP_MSG_FIELD_SINT64:
- DUMP("%Ld", "sint64", (long long signed)f->s64);
+ DUMP(0, "%Ld", tname, (long long signed)f->s64);
break;
case MRP_MSG_FIELD_DOUBLE:
- DUMP("%f", "double", f->dbl);
+ DUMP(0, "%f", tname, f->dbl);
break;
case MRP_MSG_FIELD_BLOB: {
char *p;
uint32_t i;
- fprintf(fp, "= <%s> <%u bytes, ", "blob", f->size[0]);
+ fprintf(fp, "= <%s> <%u bytes, ", tname, f->size[0]);
for (i = 0, p = f->blb; i < f->size[0]; i++, p++) {
if (isprint(*p) && *p != '\n' && *p != '\t' && *p != '\r')
break;
default:
- fprintf(fp, "= <%s> {%u items, XXX TODO}\n", "array", f->size[0]);
+ if (f->type & MRP_MSG_FIELD_ARRAY) {
+ base = f->type & ~MRP_MSG_FIELD_ARRAY;
+ tname = field_type_name(base);
+
+ fprintf(fp, "\n");
+ for (i = 0; i < f->size[0]; i++) {
+ switch (base) {
+ case MRP_MSG_FIELD_STRING:
+ DUMP(8, "'%s'", tname, f->astr[i]);
+ break;
+ case MRP_MSG_FIELD_BOOL:
+ DUMP(8, "%s", tname, f->abln[i] ? "true" : "false");
+ break;
+ case MRP_MSG_FIELD_UINT8:
+ DUMP(8, "%u", tname, f->au8[i]);
+ break;
+ case MRP_MSG_FIELD_SINT8:
+ DUMP(8, "%d", tname, f->as8[i]);
+ break;
+ case MRP_MSG_FIELD_UINT16:
+ DUMP(8, "%u", tname, f->au16[i]);
+ break;
+ case MRP_MSG_FIELD_SINT16:
+ DUMP(8, "%d", tname, f->as16[i]);
+ break;
+ case MRP_MSG_FIELD_UINT32:
+ DUMP(8, "%u", tname, f->au32[i]);
+ break;
+ case MRP_MSG_FIELD_SINT32:
+ DUMP(8, "%d", tname, f->as32[i]);
+ break;
+ case MRP_MSG_FIELD_UINT64:
+ DUMP(8, "%Lu", tname,
+ (unsigned long long)f->au64[i]);
+ break;
+ case MRP_MSG_FIELD_SINT64:
+ DUMP(8, "%Ld", tname,
+ (long long)f->as64[i]);
+ break;
+ case MRP_MSG_FIELD_DOUBLE:
+ DUMP(8, "%f", tname, f->adbl[i]);
+ break;
+ default:
+ fprintf(fp, "%*.*s= <%s>\n", 8, 8, "", tname);
+ break;
+ }
+ }
+ }
+ else
+ fprintf(fp, "= <%s>\n", tname);
}
}
l += fprintf(fp, "}\n");
return l;
+#undef DUMP
}
ssize_t mrp_msg_default_encode(mrp_msg_t *msg, void **bufp)
{
-#define RESERVE(type) ({ \
- void *_ptr; \
- \
- _ptr = mrp_msgbuf_reserve(&mb, sizeof(type), 1); \
- \
- if (_ptr == NULL) { \
- *bufp = NULL; \
- return -1; \
- } \
- \
- _ptr; \
- })
-
-#define RESERVE_SIZE(size) ({ \
- void *_ptr; \
- \
- _ptr = mrp_msgbuf_reserve(&mb, size, 1); \
- \
- if (_ptr == NULL) { \
- *bufp = NULL; \
- return -1; \
- } \
- \
- _ptr; \
- })
-
-
mrp_msg_field_t *f;
mrp_list_hook_t *p, *n;
mrp_msgbuf_t mb;
- uint32_t len;
+ uint32_t len, asize, i;
+ uint16_t type;
size_t size;
size = msg->nfield * (2 * sizeof(uint16_t) + sizeof(uint64_t));
if (mrp_msgbuf_write(&mb, size)) {
+ MRP_MSGBUF_PUSH(&mb, htobe16(MRP_MSG_TAG_DEFAULT), 1, nomem);
MRP_MSGBUF_PUSH(&mb, htobe16(msg->nfield), 1, nomem);
mrp_list_foreach(&msg->fields, p, n) {
default:
if (f->type & MRP_MSG_FIELD_ARRAY) {
- errno = EOPNOTSUPP;
- mrp_log_error("XXX TODO: MRP_MSG_FIELD_ARRAY "
- "not implemented");
+ type = f->type & ~(MRP_MSG_FIELD_ARRAY);
+ asize = f->size[0];
+ MRP_MSGBUF_PUSH(&mb, htobe32(asize), 1, nomem);
+
+ for (i = 0; i < asize; i++) {
+ switch (type) {
+ case MRP_MSG_FIELD_STRING:
+ len = strlen(f->astr[i]) + 1;
+ MRP_MSGBUF_PUSH(&mb, htobe32(len), 1, nomem);
+ MRP_MSGBUF_PUSH_DATA(&mb, f->astr[i], len,
+ 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_BOOL:
+ MRP_MSGBUF_PUSH(&mb, htobe32(f->abln[i]?TRUE:FALSE),
+ 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT8:
+ MRP_MSGBUF_PUSH(&mb, f->au8[i], 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT8:
+ MRP_MSGBUF_PUSH(&mb, f->as8[i], 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT16:
+ MRP_MSGBUF_PUSH(&mb, htobe16(f->au16[i]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT16:
+ MRP_MSGBUF_PUSH(&mb, htobe16(f->as16[i]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT32:
+ MRP_MSGBUF_PUSH(&mb, htobe32(f->au32[i]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT32:
+ MRP_MSGBUF_PUSH(&mb, htobe32(f->as32[i]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT64:
+ MRP_MSGBUF_PUSH(&mb, htobe64(f->au64[i]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT64:
+ MRP_MSGBUF_PUSH(&mb, htobe64(f->as64[i]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_DOUBLE:
+ MRP_MSGBUF_PUSH(&mb, f->adbl[i], 1, nomem);
+ break;
+
+ default:
+ goto invalid_type;
+ }
+ }
}
- else
+ else {
+ invalid_type:
errno = EINVAL;
-
- mrp_msgbuf_cancel(&mb);
- nomem:
- *bufp = NULL;
- return -1;
+ mrp_msgbuf_cancel(&mb);
+ nomem:
+ *bufp = NULL;
+ return -1;
+ }
}
}
}
mrp_msg_t *mrp_msg_default_decode(void *buf, size_t size)
{
-#define PULL(type) ({ \
- void *_ptr; \
- \
- _ptr = mrp_msgbuf_pull(&mb, sizeof(type), 1); \
- \
- if (_ptr == NULL) \
- return NULL; \
- \
- _ptr; \
- })
-
-#define PULL_SIZE(size) ({ \
- void *_ptr; \
- \
- _ptr = mrp_msgbuf_pull(&mb, size, 1); \
- \
- if (_ptr == NULL) \
- return NULL; \
- \
- _ptr; \
- })
-
mrp_msg_t *msg;
mrp_msgbuf_t mb;
mrp_msg_value_t v;
void *value;
- uint16_t nfield, tag, type;
- uint32_t len;
- int i;
+ uint16_t nfield, tag, type, base;
+ uint32_t len, n, i;
- msg = mrp_msg_create(MRP_MSG_FIELD_INVALID);
+ msg = mrp_msg_create_empty();
if (msg == NULL)
return NULL;
switch (type) {
case MRP_MSG_FIELD_STRING:
- len = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len), 1, nodata));
- value = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
- if (!mrp_msg_append(msg, tag, type, value, MRP_MSG_FIELD_INVALID))
+ len = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len), 1, nodata));
+ if (len > 0)
+ value = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
+ else
+ value = "";
+ if (!mrp_msg_append(msg, tag, type, value))
goto fail;
break;
case MRP_MSG_FIELD_BOOL:
v.bln = be32toh(MRP_MSGBUF_PULL(&mb, uint32_t, 1, nodata));
- if (!mrp_msg_append(msg, tag, type, v.bln, MRP_MSG_FIELD_INVALID))
+ if (!mrp_msg_append(msg, tag, type, v.bln))
goto fail;
break;
case MRP_MSG_FIELD_UINT8:
v.u8 = MRP_MSGBUF_PULL(&mb, typeof(v.u8), 1, nodata);
- if (!mrp_msg_append(msg, tag, type, v.u8, MRP_MSG_FIELD_INVALID))
+ if (!mrp_msg_append(msg, tag, type, v.u8))
goto fail;
break;
case MRP_MSG_FIELD_SINT8:
v.s8 = MRP_MSGBUF_PULL(&mb, typeof(v.s8), 1, nodata);
- if (!mrp_msg_append(msg, tag, type, v.s8, MRP_MSG_FIELD_INVALID))
+ if (!mrp_msg_append(msg, tag, type, v.s8))
goto fail;
break;
case MRP_MSG_FIELD_UINT16:
v.u16 = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v.u16), 1, nodata));
- if (!mrp_msg_append(msg, tag, type, v.u16, MRP_MSG_FIELD_INVALID))
+ if (!mrp_msg_append(msg, tag, type, v.u16))
goto fail;
break;
case MRP_MSG_FIELD_SINT16:
v.s16 = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v.s16), 1, nodata));
- if (!mrp_msg_append(msg, tag, type, v.s16, MRP_MSG_FIELD_INVALID))
+ if (!mrp_msg_append(msg, tag, type, v.s16))
goto fail;
break;
case MRP_MSG_FIELD_UINT32:
v.u32 = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v.u32), 1, nodata));
- if (!mrp_msg_append(msg, tag, type, v.u32, MRP_MSG_FIELD_INVALID))
+ if (!mrp_msg_append(msg, tag, type, v.u32))
goto fail;
break;
case MRP_MSG_FIELD_SINT32:
v.s32 = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v.s32), 1, nodata));
- if (!mrp_msg_append(msg, tag, type, v.s32, MRP_MSG_FIELD_INVALID))
+ if (!mrp_msg_append(msg, tag, type, v.s32))
goto fail;
break;
case MRP_MSG_FIELD_UINT64:
v.u64 = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v.u64), 1, nodata));
- if (!mrp_msg_append(msg, tag, type, v.u64, MRP_MSG_FIELD_INVALID))
+ if (!mrp_msg_append(msg, tag, type, v.u64))
goto fail;
break;
case MRP_MSG_FIELD_SINT64:
v.s64 = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v.s64), 1, nodata));
- if (!mrp_msg_append(msg, tag, type, v.s64, MRP_MSG_FIELD_INVALID))
+ if (!mrp_msg_append(msg, tag, type, v.s64))
goto fail;
break;
case MRP_MSG_FIELD_DOUBLE:
v.dbl = MRP_MSGBUF_PULL(&mb, typeof(v.dbl), 1, nodata);
- if (!mrp_msg_append(msg, tag, type, v.dbl, MRP_MSG_FIELD_INVALID))
+ if (!mrp_msg_append(msg, tag, type, v.dbl))
goto fail;
break;
case MRP_MSG_FIELD_BLOB:
len = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len), 1, nodata));
value = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
- if (!mrp_msg_append(msg, tag, type, len, value,
- MRP_MSG_FIELD_INVALID))
+ if (!mrp_msg_append(msg, tag, type, len, value))
goto fail;
break;
default:
- if (type & MRP_MSG_FIELD_ARRAY) {
- errno = EOPNOTSUPP;
- mrp_log_error("XXX TODO: MRP_MSG_FIELD_ARRAY "
- "not implemented");
+ if (!(type & MRP_MSG_FIELD_ARRAY)) {
+ errno = EINVAL;
+ goto fail;
+ }
+
+ base = type & ~MRP_MSG_FIELD_ARRAY;
+ n = be32toh(MRP_MSGBUF_PULL(&mb, typeof(n), 1, nodata));
+ {
+ char *astr[n];
+ bool abln[n];
+ uint8_t au8 [n];
+ int8_t as8 [n];
+ uint16_t au16[n];
+ int16_t as16[n];
+ uint32_t au32[n];
+ int32_t as32[n];
+ uint64_t au64[n];
+ int64_t as64[n];
+ double adbl[n];
+
+ for (i = 0; i < n; i++) {
+
+ switch (base) {
+ case MRP_MSG_FIELD_STRING:
+ len = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len),
+ 1, nodata));
+ if (len > 0)
+ astr[i] = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
+ else
+ astr[i] = "";
+ break;
+
+ case MRP_MSG_FIELD_BOOL:
+ abln[i] = be32toh(MRP_MSGBUF_PULL(&mb, uint32_t, 1,
+ nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT8:
+ au8[i] = MRP_MSGBUF_PULL(&mb, typeof(v.u8), 1, nodata);
+ break;
+
+ case MRP_MSG_FIELD_SINT8:
+ as8[i] = MRP_MSGBUF_PULL(&mb, typeof(v.s8), 1, nodata);
+ break;
+
+ case MRP_MSG_FIELD_UINT16:
+ au16[i] = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v.u16),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT16:
+ as16[i] = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v.s16),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT32:
+ au32[i] = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v.u32),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT32:
+ as32[i] = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v.s32),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT64:
+ au64[i] = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v.u64),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT64:
+ as64[i] = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v.s64),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_DOUBLE:
+ adbl[i] = MRP_MSGBUF_PULL(&mb, typeof(v.dbl),
+ 1, nodata);
+ break;
+
+ default:
+ errno = EINVAL;
+ goto fail;
+ }
+ }
+
+#define HANDLE_TYPE(_type, _var) \
+ case _type: \
+ if (!mrp_msg_append(msg, tag, \
+ MRP_MSG_FIELD_ARRAY |_type, \
+ n, _var)) \
+ goto fail; \
+ break
+
+ switch (base) {
+ HANDLE_TYPE(MRP_MSG_FIELD_STRING, astr);
+ HANDLE_TYPE(MRP_MSG_FIELD_BOOL , abln);
+ HANDLE_TYPE(MRP_MSG_FIELD_UINT8 , au8 );
+ HANDLE_TYPE(MRP_MSG_FIELD_SINT8 , as8 );
+ HANDLE_TYPE(MRP_MSG_FIELD_UINT16, au16);
+ HANDLE_TYPE(MRP_MSG_FIELD_SINT16, as16);
+ HANDLE_TYPE(MRP_MSG_FIELD_UINT32, au32);
+ HANDLE_TYPE(MRP_MSG_FIELD_SINT32, as32);
+ HANDLE_TYPE(MRP_MSG_FIELD_UINT64, au64);
+ HANDLE_TYPE(MRP_MSG_FIELD_SINT64, as64);
+ HANDLE_TYPE(MRP_MSG_FIELD_DOUBLE, adbl);
+ default:
+ errno = EINVAL;
+ goto fail;
+ }
+#undef HANDLE_TYPE
}
- else
- errno = EINVAL;
- goto fail;
}
}
}
+static int guarded_array_size(void *data, mrp_data_member_t *array)
+{
+#define MAX_ITEMS (32 * 1024)
+ uint16_t base;
+ void *value, *guard;
+ size_t size;
+ int cnt;
+
+ if (array->type & MRP_MSG_FIELD_ARRAY) {
+ base = array->type & ~MRP_MSG_FIELD_ARRAY;
+
+ switch (base) {
+ case MRP_MSG_FIELD_STRING: size = sizeof(array->str); break;
+ case MRP_MSG_FIELD_BOOL: size = sizeof(array->bln); break;
+ case MRP_MSG_FIELD_UINT8: size = sizeof(array->u8); break;
+ case MRP_MSG_FIELD_SINT8: size = sizeof(array->s8); break;
+ case MRP_MSG_FIELD_UINT16: size = sizeof(array->u16); break;
+ case MRP_MSG_FIELD_SINT16: size = sizeof(array->s16); break;
+ case MRP_MSG_FIELD_UINT32: size = sizeof(array->u32); break;
+ case MRP_MSG_FIELD_SINT32: size = sizeof(array->s32); break;
+ case MRP_MSG_FIELD_UINT64: size = sizeof(array->u64); break;
+ case MRP_MSG_FIELD_SINT64: size = sizeof(array->s64); break;
+ case MRP_MSG_FIELD_DOUBLE: size = sizeof(array->dbl); break;
+ default: return -1;
+ }
+
+ guard = &array->str;
+ value = *(void **)(data + array->offs);
+ for (cnt = 0; cnt < MAX_ITEMS; cnt++, value += size) {
+ if (!memcmp(value, guard, size))
+ return cnt + 1;
+ }
+ }
+
+ return -1;
+#undef MAX_ITEMS
+}
+
+
+static int counted_array_size(void *data, mrp_data_member_t *cnt)
+{
+ void *val = data + cnt->offs;
+
+ switch (cnt->type) {
+ case MRP_MSG_FIELD_UINT8: return (int)*(uint8_t *)val;
+ case MRP_MSG_FIELD_SINT8: return (int)*( int8_t *)val;
+ case MRP_MSG_FIELD_UINT16: return (int)*(uint16_t *)val;
+ case MRP_MSG_FIELD_SINT16: return (int)*( int16_t *)val;
+ case MRP_MSG_FIELD_UINT32: return (int)*(uint32_t *)val;
+ case MRP_MSG_FIELD_SINT32: return (int)*( int32_t *)val;
+ }
+
+ return -1;
+}
+
+
+static int get_array_size(void *data, mrp_data_descr_t *type, int idx)
+{
+ mrp_data_member_t *arr;
+
+ if (0 < idx && idx < type->nfield) {
+ arr = type->fields + idx;
+
+ if (arr->type & MRP_MSG_FIELD_ARRAY) {
+ if (arr->guard)
+ return guarded_array_size(data, arr);
+ else {
+ if ((int)arr->u32 < type->nfield)
+ return counted_array_size(data, type->fields + arr->u32);
+ }
+ }
+ }
+
+ return -1;
+}
+
+
+static int check_and_init_array_descr(mrp_data_descr_t *type, int idx)
+{
+ mrp_data_member_t *array, *cnt, *m;
+ int i;
+
+ array = type->fields + idx;
+
+ if (!array->guard) {
+ cnt = NULL;
+
+ for (i = 0, m = type->fields; i < type->nfield; i++, m++) {
+ if (m->offs == array->u32) {
+ cnt = m;
+ break;
+ }
+ }
+
+ if (cnt == NULL || cnt >= array)
+ return FALSE;
+
+ if (cnt->type < MRP_MSG_FIELD_UINT8 || cnt->type > MRP_MSG_FIELD_SINT32)
+ return FALSE;
+
+ array->u32 = i;
+
+ return TRUE;
+ }
+ else {
+ return TRUE;
+ }
+}
+
+
+int mrp_msg_register_type(mrp_data_descr_t *type)
+{
+ mrp_data_member_t *f;
+ int idx, i;
+
+ if (direct_types == NULL) {
+ direct_types = mrp_allocz_array(typeof(*direct_types), NDIRECT_TYPE);
+
+ if (direct_types == NULL)
+ return FALSE;
+ }
+
+ if (type->tag == MRP_MSG_TAG_DEFAULT) {
+ errno = EINVAL;
+ return FALSE;
+ }
+
+ mrp_list_init(&type->allocated);
+
+ /* enumerate fields, check arrays, collect extra allocations */
+ for (i = 0, f = type->fields; i < type->nfield; i++, f++) {
+ f->tag = (uint16_t)i + 1;
+
+ if (f->type & MRP_MSG_FIELD_ARRAY) {
+ if (!check_and_init_array_descr(type, i))
+ return FALSE;
+
+ mrp_list_append(&type->allocated, &f->hook);
+ }
+ else {
+ switch (f->type) {
+ case MRP_MSG_FIELD_STRING:
+ case MRP_MSG_FIELD_BLOB:
+ mrp_list_append(&type->allocated, &f->hook);
+ }
+ }
+ }
+
+ if (type->tag <= NDIRECT_TYPE) {
+ idx = type->tag - 1;
+
+ if (direct_types[idx] == NULL)
+ direct_types[idx] = type;
+ else
+ return FALSE;
+ }
+ else {
+ if (mrp_reallocz(other_types, nother_type, nother_type + 1) != NULL)
+ other_types[nother_type++] = type;
+ else
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+
+mrp_data_descr_t *mrp_msg_find_type(uint16_t tag)
+{
+ int i;
+
+ if (MRP_UNLIKELY(tag == MRP_MSG_TAG_DEFAULT))
+ return NULL;
+
+ if (tag <= NDIRECT_TYPE)
+ return direct_types[tag - 1];
+ else {
+ for (i = 0; i < nother_type; i++) {
+ if (other_types[i] != NULL && other_types[i]->tag == tag)
+ return other_types[i];
+ }
+ }
+
+ return NULL;
+}
+
+
+static __attribute__((destructor)) void cleanup_types(void)
+{
+ mrp_free(direct_types);
+ mrp_free(other_types);
+ nother_type = 0;
+}
+
+
+size_t mrp_data_encode(void **bufp, void *data, mrp_data_descr_t *descr,
+ size_t reserve)
+{
+ mrp_data_member_t *fields, *f;
+ int nfield;
+ uint16_t type;
+ mrp_msgbuf_t mb;
+ mrp_msg_value_t *v;
+ uint32_t len, asize, j;
+ int i, cnt;
+ size_t size;
+
+ fields = descr->fields;
+ nfield = descr->nfield;
+ size = reserve + nfield * (2 * sizeof(uint16_t) + sizeof(uint64_t));
+
+ if (mrp_msgbuf_write(&mb, size)) {
+ if (reserve)
+ mrp_msgbuf_reserve(&mb, reserve, 1);
+
+ for (i = 0, f = fields; i < nfield; i++, f++) {
+ MRP_MSGBUF_PUSH(&mb, htobe16(f->tag) , 1, nomem);
+
+ v = (mrp_msg_value_t *)(data + f->offs);
+
+ switch (f->type) {
+ case MRP_MSG_FIELD_STRING:
+ len = strlen(v->str) + 1;
+ MRP_MSGBUF_PUSH(&mb, htobe32(len), 1, nomem);
+ MRP_MSGBUF_PUSH_DATA(&mb, v->str, len, 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_BOOL:
+ MRP_MSGBUF_PUSH(&mb, htobe32(v->bln ? TRUE : FALSE), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT8:
+ MRP_MSGBUF_PUSH(&mb, v->u8, 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT8:
+ MRP_MSGBUF_PUSH(&mb, v->s8, 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT16:
+ MRP_MSGBUF_PUSH(&mb, htobe16(v->u16), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT16:
+ MRP_MSGBUF_PUSH(&mb, htobe16(v->s16), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT32:
+ MRP_MSGBUF_PUSH(&mb, htobe32(v->u32), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT32:
+ MRP_MSGBUF_PUSH(&mb, htobe32(v->s32), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT64:
+ MRP_MSGBUF_PUSH(&mb, htobe64(v->u64), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT64:
+ MRP_MSGBUF_PUSH(&mb, htobe64(v->s64), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_DOUBLE:
+ MRP_MSGBUF_PUSH(&mb, v->dbl, 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_BLOB:
+ errno = EOPNOTSUPP;
+ mrp_log_error("XXX TODO: MRP_MSG_FIELD_BLOB "
+ "not implemented");
+ break;
+
+ default:
+ if (f->type & MRP_MSG_FIELD_ARRAY) {
+ type = f->type & ~(MRP_MSG_FIELD_ARRAY);
+ cnt = get_array_size(data, descr, i);
+
+ if (cnt < 0)
+ goto invalid_type;
+
+ asize = (uint32_t)cnt;
+ MRP_MSGBUF_PUSH(&mb, htobe32(asize), 1, nomem);
+
+ for (j = 0; j < asize; j++) {
+ switch (type) {
+ case MRP_MSG_FIELD_STRING:
+ len = strlen(v->astr[j]) + 1;
+ MRP_MSGBUF_PUSH(&mb, htobe32(len), 1, nomem);
+ MRP_MSGBUF_PUSH_DATA(&mb, v->astr[j], len,
+ 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_BOOL:
+ MRP_MSGBUF_PUSH(&mb, htobe32(v->abln[j]?TRUE:FALSE),
+ 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT8:
+ MRP_MSGBUF_PUSH(&mb, v->au8[j], 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT8:
+ MRP_MSGBUF_PUSH(&mb, v->as8[j], 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT16:
+ MRP_MSGBUF_PUSH(&mb, htobe16(v->au16[j]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT16:
+ MRP_MSGBUF_PUSH(&mb, htobe16(v->as16[j]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT32:
+ MRP_MSGBUF_PUSH(&mb, htobe32(v->au32[j]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT32:
+ MRP_MSGBUF_PUSH(&mb, htobe32(v->as32[j]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT64:
+ MRP_MSGBUF_PUSH(&mb, htobe64(v->au64[j]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT64:
+ MRP_MSGBUF_PUSH(&mb, htobe64(v->as64[j]), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_DOUBLE:
+ MRP_MSGBUF_PUSH(&mb, v->adbl[j], 1, nomem);
+ break;
+
+ default:
+ goto invalid_type;
+ }
+ }
+ }
+ else {
+ invalid_type:
+ errno = EINVAL;
+ mrp_msgbuf_cancel(&mb);
+ nomem:
+ *bufp = NULL;
+ return 0;
+ }
+ }
+ }
+ }
+
+ *bufp = mb.buf;
+ return (size_t)(mb.p - mb.buf);
+}
+
+
+static mrp_data_member_t *member_type(mrp_data_member_t *fields, int nfield,
+ uint16_t tag)
+{
+ mrp_data_member_t *f;
+ int i;
+
+ for (i = 0, f = fields; i < nfield; i++, f++)
+ if (f->tag == tag)
+ return f;
+
+ return NULL;
+}
+
+
+void *mrp_data_decode(void **bufp, size_t *sizep, mrp_data_descr_t *descr)
+{
+ void *data;
+ mrp_data_member_t *fields, *f;
+ int nfield;
+ mrp_msgbuf_t mb;
+ uint16_t tag, base;
+ mrp_msg_value_t *v;
+ void *value;
+ uint32_t len, n, j, size;
+ int i;
+
+ fields = descr->fields;
+ nfield = descr->nfield;
+ data = mrp_allocz(descr->size);
+
+ if (MRP_UNLIKELY(data == NULL))
+ return NULL;
+
+ mrp_msgbuf_read(&mb, *bufp, *sizep);
+
+ for (i = 0; i < nfield; i++) {
+ tag = be16toh(MRP_MSGBUF_PULL(&mb, typeof(tag) , 1, nodata));
+ f = member_type(fields, nfield, tag);
+
+ if (MRP_UNLIKELY(f == NULL))
+ goto unknown_field;
+
+ v = (mrp_msg_value_t *)(data + f->offs);
+
+ switch (f->type) {
+ case MRP_MSG_FIELD_STRING:
+ len = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len), 1, nodata));
+ if (len > 0)
+ value = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
+ else
+ value = "";
+ v->str = mrp_strdup((char *)value);
+ if (v->str == NULL)
+ goto nomem;
+ break;
+
+ case MRP_MSG_FIELD_BOOL:
+ v->bln = be32toh(MRP_MSGBUF_PULL(&mb, uint32_t, 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT8:
+ v->u8 = MRP_MSGBUF_PULL(&mb, typeof(v->u8), 1, nodata);
+ break;
+
+ case MRP_MSG_FIELD_SINT8:
+ v->s8 = MRP_MSGBUF_PULL(&mb, typeof(v->s8), 1, nodata);
+ break;
+
+ case MRP_MSG_FIELD_UINT16:
+ v->u16 = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v->u16), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT16:
+ v->s16 = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v->s16), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT32:
+ v->u32 = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v->u32), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT32:
+ v->s32 = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v->s32), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT64:
+ v->u64 = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v->u64), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT64:
+ v->s64 = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v->s64), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_DOUBLE:
+ v->dbl = MRP_MSGBUF_PULL(&mb, typeof(v->dbl), 1, nodata);
+ break;
+
+ case MRP_MSG_FIELD_BLOB:
+ len = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len), 1, nodata));
+ value = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
+ v->blb = mrp_datadup(value, len);
+ if (v->blb == NULL)
+ goto nomem;
+ break;
+
+ default:
+ if (!(f->type & MRP_MSG_FIELD_ARRAY)) {
+ unknown_field:
+ errno = EINVAL;
+ goto fail;
+ }
+
+ base = f->type & ~MRP_MSG_FIELD_ARRAY;
+ n = be32toh(MRP_MSGBUF_PULL(&mb, typeof(n), 1, nodata));
+ size = n;
+
+ switch (base) {
+ case MRP_MSG_FIELD_STRING: size *= sizeof(*v->astr); break;
+ case MRP_MSG_FIELD_BOOL: size *= sizeof(*v->abln); break;
+ case MRP_MSG_FIELD_UINT8: size *= sizeof(*v->au8); break;
+ case MRP_MSG_FIELD_SINT8: size *= sizeof(*v->as8); break;
+ case MRP_MSG_FIELD_UINT16: size *= sizeof(*v->au16); break;
+ case MRP_MSG_FIELD_SINT16: size *= sizeof(*v->as16); break;
+ case MRP_MSG_FIELD_UINT32: size *= sizeof(*v->au32); break;
+ case MRP_MSG_FIELD_SINT32: size *= sizeof(*v->as32); break;
+ case MRP_MSG_FIELD_UINT64: size *= sizeof(*v->au64); break;
+ case MRP_MSG_FIELD_SINT64: size *= sizeof(*v->as64); break;
+ case MRP_MSG_FIELD_DOUBLE: size *= sizeof(*v->adbl); break;
+ default:
+ errno = EINVAL;
+ goto fail;
+ }
+
+ v->aany = mrp_allocz(size);
+ if (v->aany == NULL)
+ goto nomem;
+
+ for (j = 0; j < n; j++) {
+ switch (base) {
+ case MRP_MSG_FIELD_STRING:
+ len = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len),
+ 1, nodata));
+ if (len > 0)
+ value = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
+ else
+ value = "";
+
+ v->astr[j] = mrp_strdup(value);
+ if (v->astr[j] == NULL)
+ goto nomem;
+ break;
+
+ case MRP_MSG_FIELD_BOOL:
+ v->abln[j] = be32toh(MRP_MSGBUF_PULL(&mb, uint32_t,
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT8:
+ v->au8[j] = MRP_MSGBUF_PULL(&mb, typeof(v->u8),
+ 1, nodata);
+ break;
+
+ case MRP_MSG_FIELD_SINT8:
+ v->as8[j] = MRP_MSGBUF_PULL(&mb, typeof(v->s8),
+ 1, nodata);
+ break;
+
+ case MRP_MSG_FIELD_UINT16:
+ v->au16[j] = be16toh(MRP_MSGBUF_PULL(&mb,
+ typeof(v->u16),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT16:
+ v->as16[j] = be16toh(MRP_MSGBUF_PULL(&mb,
+ typeof(v->s16),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT32:
+ v->au32[j] = be32toh(MRP_MSGBUF_PULL(&mb,
+ typeof(v->u32),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT32:
+ v->as32[j] = be32toh(MRP_MSGBUF_PULL(&mb,
+ typeof(v->s32),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT64:
+ v->au64[j] = be64toh(MRP_MSGBUF_PULL(&mb,
+ typeof(v->u64),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT64:
+ v->as64[j] = be64toh(MRP_MSGBUF_PULL(&mb,
+ typeof(v->s64),
+ 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_DOUBLE:
+ v->adbl[j] = MRP_MSGBUF_PULL(&mb, typeof(v->dbl),
+ 1, nodata);
+ break;
+
+ default:
+ errno = EINVAL;
+ goto fail;
+ }
+ }
+ }
+ }
+
+ *bufp = mb.buf;
+ *sizep -= mb.p - mb.buf;
+ return data;
+
+ nodata:
+ nomem:
+ fail:
+ if (data != NULL) {
+ for (i = 0, f = fields; i < nfield; i++, f++) {
+ switch (f->type) {
+ case MRP_MSG_FIELD_STRING:
+ case MRP_MSG_FIELD_BLOB:
+ mrp_free(*(void **)(data + f->offs));
+ }
+ }
+
+ mrp_free(data);
+ }
+
+ return NULL;
+}
+
+
+int mrp_data_dump(void *data, mrp_data_descr_t *descr, FILE *fp)
+{
+#define DUMP(_indent, _fmt, _typename, _val) \
+ l += fprintf(fp, "%*.*s= <%s> "_fmt"\n", _indent, _indent, "", \
+ _typename, _val)
+
+ mrp_data_member_t *dm;
+ mrp_msg_value_t *v;
+ uint16_t base;
+ int i, j, l, cnt;
+ const char *tname;
+
+
+ l = fprintf(fp, "{\n");
+ for (i = 0, dm = descr->fields; i < descr->nfield; i++, dm++) {
+ l += fprintf(fp, " @%d ", dm->offs);
+ v = (mrp_msg_value_t *)(data + dm->offs);
+ tname = field_type_name(dm->type);
+
+ switch (dm->type) {
+ case MRP_MSG_FIELD_STRING:
+ DUMP(0, "'%s'", tname, v->str);
+ break;
+ case MRP_MSG_FIELD_BOOL:
+ DUMP(0, "%s", tname, v->bln ? "true" : "false");
+ break;
+ case MRP_MSG_FIELD_UINT8:
+ DUMP(0, "%u", tname, v->u8);
+ break;
+ case MRP_MSG_FIELD_SINT8:
+ DUMP(0, "%d", tname, v->s8);
+ break;
+ case MRP_MSG_FIELD_UINT16:
+ DUMP(0, "%u", tname, v->u16);
+ break;
+ case MRP_MSG_FIELD_SINT16:
+ DUMP(0, "%d", tname, v->s16);
+ break;
+ case MRP_MSG_FIELD_UINT32:
+ DUMP(0, "%u", tname, v->u32);
+ break;
+ case MRP_MSG_FIELD_SINT32:
+ DUMP(0, "%d", tname, v->s32);
+ break;
+ case MRP_MSG_FIELD_UINT64:
+ DUMP(0, "%Lu", tname, (long long unsigned)v->u64);
+ break;
+ case MRP_MSG_FIELD_SINT64:
+ DUMP(0, "%Ld", tname, (long long signed)v->s64);
+ break;
+ case MRP_MSG_FIELD_DOUBLE:
+ DUMP(0, "%f", tname, v->dbl);
+ break;
+ default:
+ if (dm->type & MRP_MSG_FIELD_ARRAY) {
+ base = dm->type & ~MRP_MSG_FIELD_ARRAY;
+ cnt = get_array_size(data, descr, i);
+
+ if (cnt < 0) {
+ fprintf(fp, "= <%s> ???\n", tname);
+ continue;
+ }
+
+ fprintf(fp, "= <%s> (%d)\n", tname, cnt);
+ tname = field_type_name(base);
+
+ for (j = 0; j < cnt; j++) {
+ switch (base) {
+ case MRP_MSG_FIELD_STRING:
+ DUMP(8, "'%s'", tname, v->astr[j]);
+ break;
+ case MRP_MSG_FIELD_BOOL:
+ DUMP(8, "%s", tname, v->abln[j] ? "true" : "false");
+ break;
+ case MRP_MSG_FIELD_UINT8:
+ DUMP(8, "%u", tname, v->au8[j]);
+ break;
+ case MRP_MSG_FIELD_SINT8:
+ DUMP(8, "%d", tname, v->as8[j]);
+ break;
+ case MRP_MSG_FIELD_UINT16:
+ DUMP(8, "%u", tname, v->au16[j]);
+ break;
+ case MRP_MSG_FIELD_SINT16:
+ DUMP(8, "%d", tname, v->as16[j]);
+ break;
+ case MRP_MSG_FIELD_UINT32:
+ DUMP(8, "%u", tname, v->au32[j]);
+ break;
+ case MRP_MSG_FIELD_SINT32:
+ DUMP(8, "%d", tname, v->as32[j]);
+ break;
+ case MRP_MSG_FIELD_UINT64:
+ DUMP(8, "%Lu", tname, (long long unsigned)v->au64[j]);
+ break;
+ case MRP_MSG_FIELD_SINT64:
+ DUMP(8, "%Ld", tname, (long long signed)v->as64[j]);
+ break;
+ case MRP_MSG_FIELD_DOUBLE:
+ DUMP(8, "%f", tname, v->adbl[j]);
+ break;
+ default:
+ fprintf(fp, "%*.*s<%s>\n", 8, 8, "", tname);
+ break;
+ }
+ }
+ }
+ }
+ }
+ l += fprintf(fp, "}\n");
+
+ return l;
+}
+
+
+int mrp_data_free(void *data, uint16_t tag)
+{
+ mrp_data_descr_t *type;
+ mrp_list_hook_t *p, *n;
+ mrp_data_member_t *f;
+ void *ptr;
+
+ type = mrp_msg_find_type(tag);
+
+ if (type != NULL) {
+ mrp_list_foreach(&type->allocated, p, n) {
+ f = mrp_list_entry(p, typeof(*f), hook);
+ ptr = *(void **)(data + f->offs);
+
+ mrp_free(ptr);
+ }
+
+ mrp_free(data);
+
+ return TRUE;
+ }
+ else
+ return FALSE;
+
+}
+
+
void *mrp_msgbuf_write(mrp_msgbuf_t *mb, size_t size)
{
mrp_clear(mb);
MRP_MSG_FIELD_INT64 = A(SINT64), /* alias for SINT64 */
MRP_MSG_FIELD_DOUBLE = 0x0b, /* double-prec. floating point */
MRP_MSG_FIELD_BLOB = 0x0c, /* a blob (not allowed in arrays) */
+ MRP_MSG_FIELD_MAX = 0x0c,
MRP_MSG_FIELD_ARRAY = 0x80, /* bit-mask to mark arrays */
} mrp_msg_field_type_t;
#undef A
+#define MRP_MSG_FIELD_ARRAY_OF(t) (MRP_MSG_FIELD_ARRAY | MRP_MSG_FIELD_##t)
-/** Tag to terminate a */
-#define MRP_MSG_INVALID_TAG MRP_MSG_FIELD_INVALID
-
+/** Sentinel to pass in as the last argument to mrp_msg_create. */
+#define MRP_MSG_FIELD_END NULL
/*
- * a message field
+ * generic messages
+ *
+ * A generic message is just a collection of message fields. By default
+ * transports are in generic messaging mode in which case they take messages
+ * as input (for transmission) and provide messages as events (for receiption).
+ * A generic message field consists of a field tag, a field type, the actual
+ * type-specific field value, and for certain types a size.
+ *
+ * The field tag is used by the communicating parties to attach semantic
+ * meaning to the field data. One can think of it as the 'name' of the field
+ * within a message. It is not interpreted by the messaging layer in any way.
+ * The field type defines what kind of data the field contains contains and
+ * it must be one of the predefined MRP_MSG_FIELD_* types. The actual field
+ * data then depends on the type. size is only used for those data types that
+ * require a size (blobs and arrays).
*/
#define MRP_MSG_VALUE_UNION union { \
int64_t s64; \
double dbl; \
void *blb; \
+ void *aany; \
char **astr; \
bool *abln; \
uint8_t *au8; \
int32_t *as32; \
uint64_t *au64; \
int64_t *as64; \
+ double *adbl; \
}
typedef MRP_MSG_VALUE_UNION mrp_msg_value_t;
typedef struct {
- mrp_list_hook_t hook; /* to message */
+ mrp_list_hook_t hook; /* hook to list of fields */
uint16_t tag; /* message field tag */
uint16_t type; /* message field type */
MRP_MSG_VALUE_UNION; /* message field value */
} mrp_msg_field_t;
-/*
- * a message
- */
-
typedef struct {
mrp_list_hook_t fields; /* list of message fields */
size_t nfield; /* number of fields */
} mrp_msg_t;
-/*
- * a message buffer to help encoding / decoding
- */
-
-typedef struct {
- void *buf; /* message buffer */
- size_t size; /* allocated size */
- void *p; /* fill pointer */
- size_t l; /* space left in buffer */
-} mrp_msgbuf_t;
-
/** Create a new message. */
-mrp_msg_t *mrp_msg_create(uint16_t tag, ...);
+mrp_msg_t *mrp_msg_create(uint16_t tag, ...) MRP_NULLTERM;
+
+/** Macro to create an empty message. */
+#define mrp_msg_create_empty() mrp_msg_create(MRP_MSG_FIELD_INVALID, NULL)
-/** Add a new reference to a message (ie. increase refcount). */
+/** Increase refcount of the given message. */
mrp_msg_t *mrp_msg_ref(mrp_msg_t *msg);
-/** Delete a reference from a message, freeing it if refcount drops to zero. */
+/** Decrease the refcount, free the message if refcount drops to zero. */
void mrp_msg_unref(mrp_msg_t *msg);
/** Append a field to a message. */
/** Dump a message. */
int mrp_msg_dump(mrp_msg_t *msg, FILE *fp);
-/** Default message encoding. */
+/** Encode the given message using the default message encoder. */
ssize_t mrp_msg_default_encode(mrp_msg_t *msg, void **bufp);
-/** Default message decoding. */
+/** Decode the given message using the default message decoder. */
mrp_msg_t *mrp_msg_default_decode(void *buf, size_t size);
+
+/*
+ * custom data types
+ *
+ * In addition to generic messages, you can instruct the messaging and
+ * transport layers to encode/decode messages directly from/to custom data
+ * structures. To do so you need to describe your data structures and register
+ * them using data descriptors. A descriptor basically consists of a type
+ * tag, structure size, number of members and and array of structure member
+ * descriptors.
+ *
+ * The data type tag is used to identify the descriptor and consequently
+ * the custom data type both during sending and receiving (ie. encoding and
+ * decoding). It is assigned by the registering entity, it must be unique,
+ * and it cannot be MRP_MSG_TAG_DEFAULT (0x0), or else registration will
+ * fail. The size is used to allocate necessary memory for the data on the
+ * receiving end. The member descriptors are used to describe the offset
+ * and types of the members within the custom data type.
+ */
+
+#define MRP_MSG_TAG_DEFAULT 0x0 /* tag for default encode/decoder */
+
+typedef struct {
+ uint16_t offs; /* offset within structure */
+ uint16_t tag; /* tag for this member */
+ uint16_t type; /* type of this member */
+ bool guard; /* whether sentinel-terminated */
+ MRP_MSG_VALUE_UNION; /* sentinel or offset of count field */
+ mrp_list_hook_t hook; /* hook to list of extra allocations */
+} mrp_data_member_t;
+
+
+typedef struct {
+ mrp_refcnt_t refcnt; /* reference count */
+ uint16_t tag; /* structure tag */
+ size_t size; /* size of this structure */
+ int nfield; /* number of members */
+ mrp_data_member_t *fields; /* member descriptors */
+ mrp_list_hook_t allocated; /* fields needing extra allocation */
+} mrp_data_descr_t;
+
+
+/** Convenience macro to declare a custom data type (and its members). */
+#define MRP_DATA_DESCRIPTOR(_var, _tag, _type, ...) \
+ static mrp_data_member_t _var##_members[] = { \
+ __VA_ARGS__ \
+ }; \
+ \
+ static mrp_data_descr_t _var = { \
+ .size = sizeof(_type), \
+ .tag = _tag, \
+ .fields = _var##_members, \
+ .nfield = MRP_ARRAY_SIZE(_var##_members) \
+ }
+
+/** Convenience macro to declare a data member. */
+#define MRP_DATA_MEMBER(_data_type, _member, _member_type) { \
+ .offs = MRP_OFFSET(_data_type, _member), \
+ .type = _member_type, \
+ .guard = FALSE \
+ }
+
+/** Convenience macro to declare an array data member with a count field. */
+#define MRP_DATA_ARRAY_COUNT(_data_type, _array, _count, _base_type) { \
+ .offs = MRP_OFFSET(_data_type, _array), \
+ .type = MRP_MSG_FIELD_ARRAY | _base_type, \
+ .guard = FALSE, \
+ .u32 = MRP_OFFSET(_data_type, _count) \
+ }
+
+/** Convenience macro to declare an array data member with a sentinel value. */
+#define MRP_DATA_ARRAY_GUARD(_data_type, _array, _guard_member, _guard_val, \
+ _base_type) { \
+ .offs = MRP_OFFSET(_data_type, _array), \
+ .type = MRP_MSG_FIELD_ARRAY | _base_type, \
+ .guard = TRUE, \
+ ._guard_member = _guard_val \
+ }
+
+/** Encode a structure using the given message descriptor. */
+size_t mrp_data_encode(void **bufp, void *data, mrp_data_descr_t *descr,
+ size_t reserve);
+
+/** Decode a structure using the given message descriptor. */
+void *mrp_data_decode(void **bufp, size_t *sizep, mrp_data_descr_t *descr);
+
+/** Dump the given data buffer. */
+int mrp_data_dump(void *data, mrp_data_descr_t *descr, FILE *fp);
+
+/** Register a new custom data type with the messaging/transport layer. */
+int mrp_msg_register_type(mrp_data_descr_t *type);
+
+/** Look up the data type descriptor corresponding to the given tag. */
+mrp_data_descr_t *mrp_msg_find_type(uint16_t tag);
+
+/** Free the given custom data allocated by the messaging layer. */
+int mrp_data_free(void *data, uint16_t tag);
+
+/*
+ * message encoding/decoding buffer
+ *
+ * This message buffer and the associated functions and macros can be
+ * used to write message encoding/decoding functions for bitpipe-type
+ * transports, ie. for transports where the underlying IPC just provides
+ * a raw data connection between the communication endpoints and does not
+ * impose/expect any structure on/from the data being transmitted.
+ *
+ * Practically all the basic stream and datagram socket transports are
+ * such. They use the default encoding/decoding functions provided by
+ * the messaging layer together with a very simple transport frame scheme,
+ * where each frame consists of the amount a size indicating the size of
+ * the encoded message in the bitpipe and the actual encoded message data.
+ *
+ * Note that at the moment this framing scheme is rather implicit in the
+ * sense that you won't find a data type representing a frame. Rather the
+ * framing is simply done in the sending/receiving code of the individual
+ * transports.
+ */
+
+typedef struct {
+ void *buf; /* buffer to encode to/decode from */
+ size_t size; /* size of the buffer */
+ void *p; /* encoding/decoding pointer */
+ size_t l; /* space left in the buffer */
+} mrp_msgbuf_t;
+
+
+
/** Initialize the given message buffer for writing. */
void *mrp_msgbuf_write(mrp_msgbuf_t *mb, size_t size);
/** Pull the given amount of data from the buffer. */
void *mrp_msgbuf_pull(mrp_msgbuf_t *mb, size_t size, size_t align);
-
+/** Push data with alignment to the buffer, jumping to errlbl on errors. */
#define MRP_MSGBUF_PUSH(mb, data, align, errlbl) do { \
size_t _size = sizeof(data); \
typeof(data) *_ptr; \
goto errlbl; \
} while (0)
+/** Push aligned data to the buffer, jumping to errlbl on errors. */
#define MRP_MSGBUF_PUSH_DATA(mb, data, size, align, errlbl) do { \
size_t _size = (size); \
void *_ptr; \
goto errlbl; \
} while (0)
+/** Pull aligned data of type from the buffer, jump to errlbl on errors. */
#define MRP_MSGBUF_PULL(mb, type, align, errlbl) ({ \
size_t _size = sizeof(type); \
type *_ptr; \
*_ptr; \
})
-
+/** Pull aligned data of type from the buffer, jump to errlbl on errors. */
#define MRP_MSGBUF_PULL_DATA(mb, size, align, errlbl) ({ \
size_t _size = size; \
void *_ptr; \
--- /dev/null
+#ifndef __MURPHY_REFCNT_H__
+#define __MURPHY_REFCNT_H__
+
+typedef int mrp_refcnt_t;
+
+#endif /* __MURPHY_REFCNT_H__ */
typedef struct {
MRP_TRANSPORT_PUBLIC_FIELDS; /* common transport fields */
int sock; /* TCP socket */
- int flags; /* socket flags */
mrp_io_watch_t *iow; /* socket I/O watch */
void *ibuf; /* input buffer */
size_t isize; /* input buffer size */
}
-static int strm_open(mrp_transport_t *mt, int flags)
+static int strm_open(mrp_transport_t *mt)
{
strm_t *t = (strm_t *)mt;
- t->sock = -1;
- t->flags = flags;
+ t->sock = -1;
return TRUE;
}
-static int strm_create(mrp_transport_t *mt, void *conn, int flags)
+static int strm_create(mrp_transport_t *mt, void *conn)
{
strm_t *t = (strm_t *)mt;
mrp_io_event_t events;
int on;
long nb;
- t->sock = *(int *)conn;
- t->flags = flags;
+ t->sock = *(int *)conn;
if (t->sock >= 0) {
- if (t->flags & MRP_TRANSPORT_REUSEADDR) {
+ if (mt->flags & MRP_TRANSPORT_REUSEADDR) {
on = 1;
setsockopt(t->sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
}
- if (t->flags & MRP_TRANSPORT_NONBLOCK) {
+ if (mt->flags & MRP_TRANSPORT_NONBLOCK) {
nb = 1;
fcntl(t->sock, F_SETFL, O_NONBLOCK, nb);
}
}
-static int strm_accept(mrp_transport_t *mt, mrp_transport_t *mlt, int flags)
+static int strm_accept(mrp_transport_t *mt, mrp_transport_t *mlt)
{
strm_t *t, *lt;
mrp_sockaddr_t addr;
int on;
long nb;
-
t = (strm_t *)mt;
lt = (strm_t *)mlt;
t->sock = accept(lt->sock, &addr.any, &addrlen);
if (t->sock >= 0) {
- if (flags & MRP_TRANSPORT_REUSEADDR) {
+ if (mt->flags & MRP_TRANSPORT_REUSEADDR) {
on = 1;
setsockopt(t->sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
}
- if (flags & MRP_TRANSPORT_NONBLOCK) {
+ if (mt->flags & MRP_TRANSPORT_NONBLOCK) {
nb = 1;
fcntl(t->sock, F_SETFL, O_NONBLOCK, nb);
}
- if (flags & MRP_TRANSPORT_CLOEXEC) {
+ if (mt->flags & MRP_TRANSPORT_CLOEXEC) {
on = 1;
fcntl(t->sock, F_SETFL, O_CLOEXEC, on);
}
ssize_t n, space, left;
void *data;
int old, error;
- mrp_msg_t *msg;
MRP_UNUSED(ml);
MRP_UNUSED(w);
size = ntohl(*sizep);
while (t->idata >= sizeof(size) + size) {
- data = t->ibuf + sizeof(size);
- msg = mrp_msg_default_decode(data, size);
-
- if (msg != NULL) {
- MRP_TRANSPORT_BUSY(mt, {
- mt->evt.recv(mt, msg, mt->user_data);
- });
-
- mrp_msg_unref(msg);
-
- if (t->check_destroy(mt))
- return;
- }
- else {
- error = EPROTO;
+ data = t->ibuf + sizeof(size);
+ error = t->recv_data(mt, data, size, NULL, 0);
+
+ if (error)
goto fatal_error;
- }
+ if (t->check_destroy(mt))
+ return;
+
left = t->idata - (sizeof(size) + size);
memmove(t->ibuf, t->ibuf + sizeof(size) + size, left);
t->idata = left;
else {
if (n == -1 && errno == EAGAIN) {
mrp_log_error("%s(): XXX TODO: this sucks, need to add "
- "output queuing for tcp-transport.",
+ "output queuing for strm-transport.",
__FUNCTION__);
}
}
}
+static int strm_sendraw(mrp_transport_t *mt, void *data, size_t size)
+{
+ strm_t *t = (strm_t *)mt;
+ ssize_t n;
+
+ if (t->connected) {
+ n = write(t->sock, data, size);
+
+ if (n == (ssize_t)size)
+ return TRUE;
+ else {
+ if (n == -1 && errno == EAGAIN) {
+ mrp_log_error("%s(): XXX TODO: this sucks, need to add "
+ "output queuing for strm-transport.",
+ __FUNCTION__);
+ }
+ }
+ }
+
+ return FALSE;
+}
+
+
+static int strm_senddata(mrp_transport_t *mt, void *data, uint16_t tag)
+{
+ strm_t *t = (strm_t *)mt;
+ mrp_data_descr_t *type;
+ ssize_t n;
+ void *buf;
+ size_t size, reserve, len;
+ uint32_t *lenp;
+ uint16_t *tagp;
+
+ if (t->connected) {
+ type = mrp_msg_find_type(tag);
+
+ if (type != NULL) {
+ reserve = sizeof(*lenp) + sizeof(*tagp);
+ size = mrp_data_encode(&buf, data, type, reserve);
+
+ if (size > 0) {
+ lenp = buf;
+ len = size - sizeof(*lenp);
+ tagp = buf + sizeof(*lenp);
+ *lenp = htobe32(len);
+ *tagp = htobe16(tag);
+
+ n = write(t->sock, buf, len + sizeof(*lenp));
+
+ mrp_free(buf);
+
+ if (n == (ssize_t)(len + sizeof(*lenp)))
+ return TRUE;
+ else {
+ if (n == -1 && errno == EAGAIN) {
+ mrp_log_error("%s(): XXX TODO: this sucks, need to add"
+ " output queueing for strm-transport.",
+ __FUNCTION__);
+ }
+ }
+ }
+ }
+ }
+
+ return FALSE;
+}
+
+
MRP_REGISTER_TRANSPORT(tcp4, "tcp4", strm_t, strm_resolve,
strm_open, strm_create, strm_close,
strm_bind, strm_listen, strm_accept,
strm_connect, strm_disconnect,
- strm_send, NULL);
+ strm_send, NULL,
+ strm_sendraw, NULL,
+ strm_senddata, NULL);
MRP_REGISTER_TRANSPORT(tcp6, "tcp6", strm_t, strm_resolve,
strm_open, strm_create, strm_close,
strm_bind, strm_listen, strm_accept,
strm_connect, strm_disconnect,
- strm_send, NULL);
+ strm_send, NULL,
+ strm_sendraw, NULL,
+ strm_senddata, NULL);
MRP_REGISTER_TRANSPORT(unxstrm, "unxstrm", strm_t, strm_resolve,
strm_open, strm_create, strm_close,
strm_bind, strm_listen, strm_accept,
strm_connect, strm_disconnect,
- strm_send, NULL);
+ strm_send, NULL,
+ strm_sendraw, NULL,
+ strm_senddata, NULL);
#include <sys/types.h>
#include <sys/socket.h>
+#define __GNU_SOURCE
+#include <getopt.h>
+
#include <murphy/common.h>
-#define TAG_END ((uint16_t)0x0)
-#define TAG_SEQ ((uint16_t)0x1)
-#define TAG_FOO ((uint16_t)0x2)
-#define TAG_BAR ((uint16_t)0x3)
-#define TAG_MSG ((uint16_t)0x4)
-#define TAG_RPL ((uint16_t)0x5)
+/*
+ * tags for generic message fields
+ */
+
+#define TAG_SEQ ((uint16_t)0x1)
+#define TAG_MSG ((uint16_t)0x2)
+#define TAG_U8 ((uint16_t)0x3)
+#define TAG_S8 ((uint16_t)0x4)
+#define TAG_U16 ((uint16_t)0x5)
+#define TAG_S16 ((uint16_t)0x6)
+#define TAG_DBL ((uint16_t)0x7)
+#define TAG_BLN ((uint16_t)0x8)
+#define TAG_ASTR ((uint16_t)0x9)
+#define TAG_AU32 ((uint16_t)0xa)
+#define TAG_RPL ((uint16_t)0xb)
+#define TAG_END MRP_MSG_FIELD_END
+
+#define U32_GUARD (uint32_t)-1
+
+/*
+ * our test custom data type
+ */
+
+#define TAG_CUSTOM 0x1
+
+typedef struct {
+ uint32_t seq;
+ char *msg;
+ uint8_t u8;
+ int8_t s8;
+ uint16_t u16;
+ int16_t s16;
+ double dbl;
+ bool bln;
+ char **astr;
+ uint32_t nstr;
+ uint32_t *au32;
+ char *rpl;
+} custom_t;
+
+
+MRP_DATA_DESCRIPTOR(custom_descr, TAG_CUSTOM, custom_t,
+ MRP_DATA_MEMBER(custom_t, seq, MRP_MSG_FIELD_UINT32),
+ MRP_DATA_MEMBER(custom_t, msg, MRP_MSG_FIELD_STRING),
+ MRP_DATA_MEMBER(custom_t, u8, MRP_MSG_FIELD_UINT8 ),
+ MRP_DATA_MEMBER(custom_t, s8, MRP_MSG_FIELD_SINT8 ),
+ MRP_DATA_MEMBER(custom_t, u16, MRP_MSG_FIELD_UINT16),
+ MRP_DATA_MEMBER(custom_t, s16, MRP_MSG_FIELD_SINT16),
+ MRP_DATA_MEMBER(custom_t, dbl, MRP_MSG_FIELD_DOUBLE),
+ MRP_DATA_MEMBER(custom_t, bln, MRP_MSG_FIELD_BOOL ),
+ MRP_DATA_MEMBER(custom_t, rpl, MRP_MSG_FIELD_STRING),
+ MRP_DATA_MEMBER(custom_t, nstr, MRP_MSG_FIELD_UINT32),
+ MRP_DATA_ARRAY_COUNT(custom_t, astr, nstr,
+ MRP_MSG_FIELD_STRING),
+ MRP_DATA_ARRAY_GUARD(custom_t, au32, u32, U32_GUARD,
+ MRP_MSG_FIELD_UINT32));
+
typedef struct {
mrp_mainloop_t *ml;
mrp_transport_t *t;
int server;
- char *saddr;
- char *caddr;
+ char *addrstr;
+ mrp_sockaddr_t addr;
+ socklen_t alen;
+ const char *atype;
int sock;
mrp_io_watch_t *iow;
mrp_timer_t *timer;
+ int custom;
+ int connect;
+ int log_mask;
+ const char *log_target;
+ uint32_t seqno;
} context_t;
-void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+void recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data);
+void recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
+
+
+void dump_msg(mrp_msg_t *msg, FILE *fp)
+{
+ mrp_msg_dump(msg, fp);
+}
+
+
+void dump_custom(custom_t *msg, FILE *fp)
+{
+ uint32_t i;
+
+ mrp_data_dump(msg, &custom_descr, fp);
+ fprintf(fp, "{\n");
+ fprintf(fp, " seq = %u\n" , msg->seq);
+ fprintf(fp, " msg = '%s'\n", msg->msg);
+ fprintf(fp, " u8 = %u\n" , msg->u8);
+ fprintf(fp, " s8 = %d\n" , msg->s8);
+ fprintf(fp, " u16 = %u\n" , msg->u16);
+ fprintf(fp, " s16 = %d\n" , msg->s16);
+ fprintf(fp, " dbl = %f\n" , msg->dbl);
+ fprintf(fp, " bln = %s\n" , msg->bln ? "true" : "false");
+ fprintf(fp, " astr = (%u)\n", msg->nstr);
+ for (i = 0; i < msg->nstr; i++)
+ fprintf(fp, " %s\n", msg->astr[i]);
+ fprintf(fp, " au32 =\n");
+ for (i = 0; msg->au32[i] != U32_GUARD; i++)
+ fprintf(fp, " %u\n", msg->au32[i]);
+ fprintf(fp, " rpl = '%s'\n", msg->rpl);
+ fprintf(fp, "}\n");
+}
+
+
+void free_custom(custom_t *msg)
+{
+ mrp_data_free(msg, custom_descr.tag);
+}
+
+
+void recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
{
context_t *c = (context_t *)user_data;
mrp_msg_field_t *f;
char buf[256];
mrp_log_info("received a message");
- mrp_msg_dump(msg, stdout);
+ dump_msg(msg, stdout);
if (c->server) {
seq = 0;
}
-void recvfrom_evt(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
+void recvfrom_msg(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
socklen_t addrlen, void *user_data)
{
context_t *c = (context_t *)user_data;
char buf[256];
mrp_log_info("received a message");
- mrp_msg_dump(msg, stdout);
+ dump_msg(msg, stdout);
if (c->server) {
seq = 0;
}
+void recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data)
+{
+ context_t *c = (context_t *)user_data;
+ custom_t *msg = (custom_t *)data;
+ custom_t rpl;
+ char buf[256];
+ uint32_t au32[] = { 9, 8, 7, 6, 5, -1 };
+
+ mrp_log_info("received custom message of type 0x%x", tag);
+ dump_custom(data, stdout);
+
+ if (tag != custom_descr.tag) {
+ mrp_log_error("Tag 0x%x != our custom type (0x%x).",
+ tag, custom_descr.tag);
+ exit(1);
+ }
+
+
+ if (c->server) {
+ rpl = *msg;
+ snprintf(buf, sizeof(buf), "reply to message #%u", msg->seq);
+ rpl.rpl = buf;
+ rpl.au32 = au32;
+
+ if (mrp_transport_senddata(t, &rpl, custom_descr.tag))
+ mrp_log_info("reply successfully sent");
+ else
+ mrp_log_error("failed to send reply");
+ }
+
+ free_custom(msg);
+}
+
+
+void recvfrom_custom(mrp_transport_t *t, void *data, uint16_t tag,
+ mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data)
+{
+ context_t *c = (context_t *)user_data;
+ custom_t *msg = (custom_t *)data;
+ custom_t rpl;
+ char buf[256];
+ uint32_t au32[] = { 9, 8, 7, 6, 5, -1 };
+
+ mrp_log_info("received custom message of type 0x%x", tag);
+ dump_custom(data, stdout);
+
+ if (tag != custom_descr.tag) {
+ mrp_log_error("Tag 0x%x != our custom type (0x%x).",
+ tag, custom_descr.tag);
+ exit(1);
+ }
+
+
+ if (c->server) {
+ rpl = *msg;
+ snprintf(buf, sizeof(buf), "reply to message #%u", msg->seq);
+ rpl.rpl = buf;
+ rpl.au32 = au32;
+
+ if (mrp_transport_senddatato(t, &rpl, custom_descr.tag, addr, addrlen))
+ mrp_log_info("reply successfully sent");
+ else
+ mrp_log_error("failed to send reply");
+ }
+
+ free_custom(msg);
+}
+
+
void closed_evt(mrp_transport_t *t, int error, void *user_data)
{
context_t *c = (context_t *)user_data;
}
+void type_init(void)
+{
+ if (!mrp_msg_register_type(&custom_descr)) {
+ mrp_log_error("Failed to register custom data type.");
+ exit(1);
+ }
+}
+
+
void server_init(context_t *c)
{
static mrp_transport_evt_t evt = {
.closed = closed_evt,
.recv = NULL,
- .recvfrom = recvfrom_evt,
+ .recvfrom = NULL,
};
- mrp_sockaddr_t addr;
- socklen_t len;
- const char *type;
+ int flags;
- len = sizeof(addr);
- len = mrp_transport_resolve(c->t, c->saddr, &addr, len, &type);
+ type_init();
- if (len > 0) {
- c->t = mrp_transport_create(c->ml, type, &evt, c, 0);
-
- if (c->t == NULL) {
- mrp_log_error("Failed to create new transport.");
- exit(1);
- }
-
- if (!mrp_transport_bind(c->t, &addr, len)) {
- mrp_log_error("Failed to bind to %s.", c->saddr);
- exit(1);
- }
-
- mrp_log_info("Waiting for messages on %s...", c->saddr);
+ if (c->custom) {
+ evt.recvdata = recv_custom;
+ evt.recvdatafrom = recvfrom_custom;
}
else {
- mrp_log_error("Failed to resolve address '%s'.", c->saddr);
+ evt.recv = recv_msg;
+ evt.recvfrom = recvfrom_msg;
+ }
+
+ flags = MRP_TRANSPORT_REUSEADDR |
+ c->custom ? MRP_TRANSPORT_MODE_CUSTOM : 0;
+ c->t = mrp_transport_create(c->ml, c->atype, &evt, c, flags);
+
+ if (c->t == NULL) {
+ mrp_log_error("Failed to create new transport.");
+ exit(1);
+ }
+
+ if (!mrp_transport_bind(c->t, &c->addr, c->alen)) {
+ mrp_log_error("Failed to bind to %s.", c->addrstr);
exit(1);
}
+
+ mrp_log_info("Waiting for messages on %s...", c->addrstr);
}
-void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
+void send_msg(context_t *c)
{
- static uint32_t seqno = 1;
-
- context_t *c = (context_t *)user_data;
mrp_msg_t *msg;
uint32_t seq;
char buf[256];
- uint32_t len;
-
- MRP_UNUSED(ml);
- MRP_UNUSED(t);
-
+ char *astr[] = { "this", "is", "an", "array", "of", "strings" };
+ uint32_t au32[] = { 1, 2, 3,
+ 1 << 16, 2 << 16, 3 << 16,
+ 1 << 24, 2 << 24, 3 << 24 };
+ uint32_t nstr = MRP_ARRAY_SIZE(astr);
+ uint32_t nu32 = MRP_ARRAY_SIZE(au32);
+ int status;
+
+ seq = c->seqno++;
+ snprintf(buf, sizeof(buf), "this is message #%u", (unsigned int)seq);
- seq = seqno++;
- len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
- if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
- TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
- TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
- TAG_MSG, MRP_MSG_FIELD_BLOB , len, buf,
- TAG_END)) == NULL) {
+ msg = mrp_msg_create(TAG_SEQ , MRP_MSG_FIELD_UINT32, seq,
+ TAG_MSG , MRP_MSG_FIELD_STRING, buf,
+ TAG_U8 , MRP_MSG_FIELD_UINT8 , seq & 0xf,
+ TAG_S8 , MRP_MSG_FIELD_SINT8 , -(seq & 0xf),
+ TAG_U16 , MRP_MSG_FIELD_UINT16, seq,
+ TAG_S16 , MRP_MSG_FIELD_SINT16, - seq,
+ TAG_DBL , MRP_MSG_FIELD_DOUBLE, seq / 3.0,
+ TAG_BLN , MRP_MSG_FIELD_BOOL , seq & 0x1,
+ TAG_ASTR, MRP_MSG_FIELD_ARRAY_OF(STRING), nstr, astr,
+ TAG_AU32, MRP_MSG_FIELD_ARRAY_OF(UINT32), nu32, au32,
+ TAG_END);
+
+ if (msg == NULL) {
mrp_log_error("Failed to create new message.");
exit(1);
}
- if (!mrp_transport_send(c->t, msg)) {
+ if (c->connect)
+ status = mrp_transport_send(c->t, msg);
+ else
+ status = mrp_transport_sendto(c->t, msg, &c->addr, c->alen);
+
+ if (!status) {
mrp_log_error("Failed to send message #%d.", seq);
exit(1);
}
}
+void send_custom(context_t *c)
+{
+ uint32_t seq = c->seqno++;
+ custom_t msg;
+ char buf[256];
+ char *astr[] = { "this", "is", "a", "test", "string", "array" };
+ uint32_t au32[] = { 1, 2, 3, 4, 5, 6, 7, -1 };
+ int status;
+
+ msg.seq = seq;
+ snprintf(buf, sizeof(buf), "this is message #%u", (unsigned int)seq);
+ msg.msg = buf;
+ msg.u8 = seq & 0xf;
+ msg.s8 = -(seq & 0xf);
+ msg.u16 = seq;
+ msg.s16 = - seq;
+ msg.dbl = seq / 3.0;
+ msg.bln = seq & 0x1;
+ msg.astr = astr;
+ msg.nstr = MRP_ARRAY_SIZE(astr);
+ msg.au32 = au32;
+ msg.rpl = "";
+
+ if (c->connect)
+ status = mrp_transport_senddata(c->t, &msg, custom_descr.tag);
+ else
+ status = mrp_transport_senddatato(c->t, &msg, custom_descr.tag,
+ &c->addr, c->alen);
+
+ if (!status) {
+ mrp_log_error("Failed to send message #%d.", msg.seq);
+ exit(1);
+ }
+ else
+ mrp_log_info("Message #%d succesfully sent.", msg.seq);
+}
+
+
+
+void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
+{
+ context_t *c = (context_t *)user_data;
+
+ MRP_UNUSED(ml);
+ MRP_UNUSED(t);
+
+ if (c->custom)
+ send_custom(c);
+ else
+ send_msg(c);
+}
+
+
void client_init(context_t *c)
{
static mrp_transport_evt_t evt = {
.closed = closed_evt,
- .recv = recv_evt,
+ .recv = NULL,
.recvfrom = NULL,
};
- mrp_sockaddr_t sa, ca;
- socklen_t sl, cl;
- const char *type;
-
- sl = mrp_transport_resolve(NULL, c->saddr, &sa, sizeof(sa), &type);
+ int flags;
+
+ type_init();
- if (sl <= 0) {
- mrp_log_error("Failed resolve transport address '%s'.", c->saddr);
- exit(1);
+ if (c->custom) {
+ evt.recvdata = recv_custom;
+ evt.recvdatafrom = recvfrom_custom;
+ }
+ else {
+ evt.recv = recv_msg;
+ evt.recvfrom = recvfrom_msg;
}
- c->t = mrp_transport_create(c->ml, type, &evt, c, 0);
+ flags = c->custom ? MRP_TRANSPORT_MODE_CUSTOM : 0;
+ c->t = mrp_transport_create(c->ml, c->atype, &evt, c, flags);
if (c->t == NULL) {
mrp_log_error("Failed to create new transport.");
exit(1);
}
- if (c->caddr) {
- cl = mrp_transport_resolve(NULL, c->caddr, &ca, sizeof(ca), &type);
-
- if (cl <= 0) {
- mrp_log_error("Failed resolve transport address '%s'.", c->caddr);
- exit(1);
- }
-
- if (!mrp_transport_bind(c->t, &ca, cl)) {
- mrp_log_error("Failed to bind to %s.", c->caddr);
+ if (c->connect) {
+ mrp_log_info("Connecting to server...");
+ if (!mrp_transport_connect(c->t, &c->addr, c->alen)) {
+ mrp_log_error("Failed to connect to %s.", c->addrstr);
exit(1);
}
- else
- mrp_log_info("Bound local endpoint to '%s'...", c->caddr);
- }
-
- if (!mrp_transport_connect(c->t, &sa, sl)) {
- mrp_log_error("Failed to connect to %s.", c->saddr);
- exit(1);
}
+ else
+ mrp_log_info("Running in unconnected mode...");
c->timer = mrp_add_timer(c->ml, 1000, send_cb, c);
}
-int main(int argc, char *argv[])
+
+static void print_usage(const char *argv0, int exit_code, const char *fmt, ...)
{
- context_t c;
- int i;
+ va_list ap;
+
+ if (fmt && *fmt) {
+ va_start(ap, fmt);
+ vprintf(fmt, ap);
+ va_end(ap);
+ }
+
+ printf("usage: %s [options] [transport-address]\n\n"
+ "The possible options are:\n"
+ " -s, --server run as test server (default)\n"
+ " -c, --custom use custom messages\n"
+ " -m, --message use generic messages (default)\n"
+ " -C, --connect connect transport\n"
+ " -t, --log-target=TARGET log target to use\n"
+ " TARGET is one of stderr,stdout,syslog, or a logfile path\n"
+ " -l, --log-level=LEVELS logging level to use\n"
+ " LEVELS is a comma separated list of info, error and warning\n"
+ " -v, --verbose increase logging verbosity\n"
+ " -d, --debug enable debug messages\n"
+ " -h, --help show help on usage\n",
+ argv0);
+
+ if (exit_code < 0)
+ return;
+ else
+ exit(exit_code);
+}
- mrp_clear(&c);
- mrp_log_set_mask(MRP_LOG_UPTO(MRP_LOG_DEBUG));
- mrp_log_set_target(MRP_LOG_TO_STDOUT);
-
- for (i = 1; i < argc; i++) {
- if (!strcmp(argv[i], "-s") || !strcmp(argv[i], "--server"))
- c.server = TRUE;
- else if (!strcmp(argv[i], "-c") || !strcmp(argv[i], "--client"))
- c.server = FALSE;
- else {
- if (c.saddr == NULL)
- c.saddr = argv[i];
- else if (c.caddr == NULL)
- c.caddr = argv[i];
- else {
- mrp_log_error("Unrecognized argument '%s'.", argv[i]);
- goto invalid_cmdline;
- }
+static void config_set_defaults(context_t *ctx)
+{
+ mrp_clear(ctx);
+ ctx->addrstr = "udp4:127.0.0.1:3000";
+ ctx->server = FALSE;
+ ctx->custom = FALSE;
+ ctx->connect = FALSE;
+ ctx->log_mask = MRP_LOG_UPTO(MRP_LOG_DEBUG);
+ ctx->log_target = MRP_LOG_TO_STDERR;
+}
+
+
+int parse_cmdline(context_t *ctx, int argc, char **argv)
+{
+ #define OPTIONS "scmCa:l:t:vdh"
+ struct option options[] = {
+ { "server" , no_argument , NULL, 's' },
+ { "address" , required_argument, NULL, 'a' },
+ { "custom" , no_argument , NULL, 'c' },
+ { "message" , no_argument , NULL, 'm' },
+ { "connect" , no_argument , NULL, 'C' },
+ { "log-level" , required_argument, NULL, 'l' },
+ { "log-target", required_argument, NULL, 't' },
+ { "verbose" , optional_argument, NULL, 'v' },
+ { "debug" , no_argument , NULL, 'd' },
+ { "help" , no_argument , NULL, 'h' },
+ { NULL, 0, NULL, 0 }
+ };
+
+ int opt, debug;
+
+ debug = FALSE;
+ config_set_defaults(ctx);
+
+ while ((opt = getopt_long(argc, argv, OPTIONS, options, NULL)) != -1) {
+ switch (opt) {
+ case 's':
+ ctx->server = TRUE;
+ break;
+
+ case 'c':
+ ctx->custom = TRUE;
+ break;
+
+ case 'm':
+ ctx->custom = FALSE;
+ break;
+
+ case 'C':
+ ctx->connect = TRUE;
+ break;
+
+ case 'a':
+ ctx->addrstr = optarg;
+ break;
+
+ case 'v':
+ ctx->log_mask <<= 1;
+ ctx->log_mask |= 1;
+ break;
+
+ case 'l':
+ ctx->log_mask = mrp_log_parse_levels(optarg);
+ if (ctx->log_mask < 0)
+ print_usage(argv[0], EINVAL, "invalid log level '%s'", optarg);
+ break;
+
+ case 't':
+ ctx->log_target = mrp_log_parse_target(optarg);
+ if (!ctx->log_target)
+ print_usage(argv[0], EINVAL, "invalid log target '%s'", optarg);
+ break;
+
+ case 'd':
+ debug = TRUE;
+ break;
+
+ case 'h':
+ print_usage(argv[0], -1, "");
+ exit(0);
+ break;
+
+ default:
+ print_usage(argv[0], EINVAL, "invalid option '%c'", opt);
}
}
+ if (debug)
+ ctx->log_mask |= MRP_LOG_MASK_DEBUG;
+
+ return TRUE;
+}
+
+
+int main(int argc, char *argv[])
+{
+ context_t c;
+
+ mrp_clear(&c);
+
+ if (!parse_cmdline(&c, argc, argv))
+ exit(1);
+
+ mrp_log_set_mask(c.log_mask);
+ mrp_log_set_target(c.log_target);
+
if (c.server)
- mrp_log_info("Running as server, using address '%s'...", c.saddr);
+ mrp_log_info("Running as server, using address '%s'...", c.addrstr);
else
- mrp_log_info("Running as client, server is at '%s'...", c.saddr);
-
- if (c.caddr)
- mrp_log_info("Going to bind client side-socket to '%s'...", c.caddr);
+ mrp_log_info("Running as client, using address '%s'...", c.addrstr);
+ if (c.custom)
+ mrp_log_info("Using custom messages...");
+ else
+ mrp_log_info("Using generic messages...");
c.ml = mrp_mainloop_create();
+ c.alen = mrp_transport_resolve(NULL, c.addrstr,
+ &c.addr, sizeof(c.addr), &c.atype);
+
+ if (c.alen <= 0) {
+ mrp_log_error("Failed to resolve transport address '%s'.", c.addrstr);
+ exit(1);
+ }
+
if (c.server)
server_init(&c);
else
mrp_mainloop_run(c.ml);
return 0;
-
- invalid_cmdline:
- mrp_log_error("invalid command line arguments");
- mrp_log_error("usage: %s [-s|-c] <server> [<client>]", argv[0]);
- exit(1);
}
}
-int main(int argc, char *argv[])
+void test_default_encode_decode(int argc, char **argv)
{
mrp_msg_t *msg, *decoded;
void *encoded;
char *val, *end;
int i, ok;
- mrp_log_set_mask(MRP_LOG_UPTO(MRP_LOG_DEBUG));
- mrp_log_set_target(MRP_LOG_TO_STDOUT);
-
- if ((msg = mrp_msg_create(MRP_MSG_FIELD_INVALID)) == NULL) {
+ if ((msg = mrp_msg_create_empty()) == NULL) {
mrp_log_error("Failed to create new message.");
exit(1);
}
mrp_msg_unref(msg);
mrp_msg_unref(decoded);
+}
+
+
+typedef struct {
+ char *str1;
+ uint16_t u16;
+ int32_t s32;
+ char *str2;
+ double dbl1;
+ bool bln1;
+ double dbl2;
+ char *str3;
+ bool bln2;
+} data1_t;
+
+typedef struct {
+ char *str;
+ uint8_t u8;
+ bool bln;
+} data2_t;
+
+typedef struct {
+ char *str;
+ uint16_t u16;
+ int32_t s32;
+ double dbl;
+} data3_t;
+
+#if 0
+typedef struct {
+ uint16_t offs; /* member offset within structure */
+ uint16_t tag; /* tag for member */
+ uint16_t type; /* type of this member */
+} mrp_msg_member_t;
+
+typedef struct {
+ uint16_t tag; /* structure tag */
+ size_t size; /* structure size */
+ int nfield; /* number of members */
+ mrp_msg_member_t *fields; /* member descriptor */
+} mrp_msg_descr_t;
+#endif
+
+#define DUMP_FIELD(memb, fmt) printf(" %s: "fmt"\n", #memb, d->memb)
+
+int cmp_data1(data1_t *d1, data1_t *d2)
+{
+ return
+ !strcmp(d1->str1, d2->str1) &&
+ !strcmp(d1->str2, d2->str2) &&
+ !strcmp(d1->str3, d2->str3) &&
+ d1->u16 == d2->u16 &&
+ d1->s32 == d2->s32 &&
+ d1->dbl1 == d2->dbl1 &&
+ d1->bln1 == d2->bln1 &&
+ d1->dbl2 == d2->dbl2 &&
+ d1->bln2 == d2->bln2;
+}
+
+int cmp_data2(data2_t *d1, data2_t *d2)
+{
+ return
+ !strcmp(d1->str, d2->str) &&
+ d1->u8 == d2->u8 &&
+ d1->bln == d2->bln;
+}
+
+int cmp_data3(data3_t *d1, data3_t *d2)
+{
+ return
+ !strcmp(d1->str, d2->str) &&
+ d1->u16 == d2->u16 &&
+ d1->s32 == d2->s32 &&
+ d1->dbl == d2->dbl;
+}
+
+void dump_data1(char *prefix, data1_t *d)
+{
+ printf("%s{\n", prefix);
+ DUMP_FIELD(str1, "%s");
+ DUMP_FIELD(u16 , "%u");
+ DUMP_FIELD(s32 , "%d");
+ DUMP_FIELD(str2, "%s");
+ DUMP_FIELD(dbl1, "%f");
+ DUMP_FIELD(bln1, "%d");
+ DUMP_FIELD(dbl2, "%f");
+ DUMP_FIELD(str2, "%s");
+ DUMP_FIELD(bln2, "%d");
+ printf("}\n");
+
+}
+
+void dump_data2(char *prefix, data2_t *d)
+{
+ printf("%s{\n", prefix);
+ DUMP_FIELD(str, "%s");
+ DUMP_FIELD(u8 , "%u");
+ DUMP_FIELD(bln, "%d");
+ printf("}\n");
+}
+
+void dump_data3(char *prefix, data3_t *d)
+{
+ printf("%s{\n", prefix);
+ DUMP_FIELD(str, "%s");
+ DUMP_FIELD(u16, "%u");
+ DUMP_FIELD(s32, "%d");
+ DUMP_FIELD(dbl, "%f");
+ printf("}\n");
+}
+
+#undef DUMP_FIELD
+
+static size_t mrp_msg_encode(void **bufp, void *data,
+ mrp_data_member_t *fields, int nfield);
+
+static void *mrp_msg_decode(void **bufp, size_t *sizep, size_t data_size,
+ mrp_data_member_t *fields, int nfield);
+
+void test_custom_encode_decode(void)
+{
+#define DESCRIBE(_type, _memb, _tag, _ftype) { \
+ .offs = MRP_OFFSET(_type, _memb), \
+ .tag = _tag, \
+ .type = MRP_MSG_FIELD_##_ftype, \
+ .guard = FALSE, \
+ { NULL }, \
+ .hook = { NULL, NULL } \
+ }
+
+ mrp_data_member_t data1_descr[] = {
+ DESCRIBE(data1_t, str1, 0x1, STRING),
+ DESCRIBE(data1_t, u16, 0x2, UINT16),
+ DESCRIBE(data1_t, str1, 0x1, STRING),
+ DESCRIBE(data1_t, u16 , 0x2, UINT16),
+ DESCRIBE(data1_t, s32 , 0x3, SINT32),
+ DESCRIBE(data1_t, str2, 0x4, STRING),
+ DESCRIBE(data1_t, dbl1, 0x5, DOUBLE),
+ DESCRIBE(data1_t, bln1, 0x6, BOOL ),
+ DESCRIBE(data1_t, dbl2, 0x7, DOUBLE),
+ DESCRIBE(data1_t, str3, 0x8, STRING),
+ DESCRIBE(data1_t, bln2, 0x9, BOOL ),
+ };
+ int data1_nfield = MRP_ARRAY_SIZE(data1_descr);
+
+ mrp_data_member_t data2_descr[] = {
+ DESCRIBE(data2_t, str, 0x1, STRING),
+ DESCRIBE(data2_t, u8 , 0x2, UINT8 ),
+ DESCRIBE(data2_t, bln, 0x3, BOOL ),
+ };
+ int data2_nfield = MRP_ARRAY_SIZE(data2_descr);
+
+ mrp_data_member_t data3_descr[] = {
+ DESCRIBE(data3_t, str, 0x1, STRING),
+ DESCRIBE(data3_t, u16, 0x2, UINT16),
+ DESCRIBE(data3_t, s32, 0x3, SINT32),
+ DESCRIBE(data3_t, dbl, 0x4, DOUBLE),
+ };
+ int data3_nfield = MRP_ARRAY_SIZE(data3_descr);
+
+#define TAG_DATA1 0x1
+#define TAG_DATA2 0x2
+#define TAG_DATA3 0x3
+
+
+ data1_t data1 = {
+ .str1 = "data1, str1",
+ .u16 = 32768U,
+ .s32 = -12345678,
+ .str2 = "data1, str2",
+ .dbl1 = 9.81,
+ .bln1 = TRUE,
+ .dbl2 = -3.141,
+ .str3 = "data1, str3",
+ .bln2 = FALSE
+ };
+ data2_t data2 = {
+ .str = "data2, str",
+ .u8 = 128,
+ .bln = TRUE
+ };
+ data3_t data3 = {
+ .str = "data3, str",
+ .u16 = 32768U,
+ .s32 = -12345678,
+ .dbl = 1.2345
+ };
+
+ data1_t *d1;
+ data2_t *d2;
+ data3_t *d3;
+ void *buf;
+ size_t size;
+
+ size = mrp_msg_encode(&buf, &data1, data1_descr, data1_nfield);
+
+ if (size <= 0) {
+ mrp_log_error("failed to encode data1_t");
+ exit(1);
+ }
+
+ d1 = mrp_msg_decode(&buf, &size, sizeof(data1_t), data1_descr,data1_nfield);
+
+ if (d1 == NULL) {
+ mrp_log_error("failed to decode encoded data1_t");
+ exit(1);
+ }
+
+ dump_data1("original data1: ", &data1);
+ dump_data1("decoded data1: ", d1);
+ if (!cmp_data1(&data1, d1)) {
+ mrp_log_error("Original and decoded data1_t do not match!");
+ exit(1);
+ }
+ else
+ mrp_log_info("ok, original and decoded match...");
+
+
+ size = mrp_msg_encode(&buf, &data2, data2_descr, data2_nfield);
+
+ if (size <= 0) {
+ mrp_log_error("failed to encode data2_t");
+ exit(1);
+ }
+
+ d2 = mrp_msg_decode(&buf, &size, sizeof(data2_t), data2_descr,data2_nfield);
+
+ if (d2 == NULL) {
+ mrp_log_error("failed to decode encoded data2_t");
+ exit(1);
+ }
+
+ dump_data2("original data2: ", &data2);
+ dump_data2("decoded data2: ", d2);
+ if (!cmp_data2(&data2, d2)) {
+ mrp_log_error("Original and decoded data2_t do not match!");
+ exit(1);
+ }
+ else
+ mrp_log_info("ok, original and decoded match...");
+
+
+ size = mrp_msg_encode(&buf, &data3, data3_descr, data3_nfield);
+
+ if (size <= 0) {
+ mrp_log_error("failed to encode data3_t");
+ exit(1);
+ }
+
+ d3 = mrp_msg_decode(&buf, &size, sizeof(data3_t), data3_descr,data3_nfield);
+
+ if (d3 == NULL) {
+ mrp_log_error("failed to decode encoded data3_t");
+ exit(1);
+ }
+
+ dump_data3("original data3: ", &data3);
+ dump_data3("decoded data3: ", d3);
+ if (!cmp_data3(&data3, d3)) {
+ mrp_log_error("Original and decoded data3_t do not match!");
+ exit(1);
+ }
+ else
+ mrp_log_info("ok, original and decoded match...");
+}
+
+
+int main(int argc, char *argv[])
+{
+ mrp_log_set_mask(MRP_LOG_UPTO(MRP_LOG_DEBUG));
+ mrp_log_set_target(MRP_LOG_TO_STDOUT);
+
+ test_default_encode_decode(argc, argv);
+ test_custom_encode_decode();
return 0;
}
+
+
+static size_t mrp_msg_encode(void **bufp, void *data,
+ mrp_data_member_t *fields, int nfield)
+{
+ mrp_data_member_t *f;
+ mrp_msgbuf_t mb;
+ mrp_msg_value_t *v;
+ uint32_t len;
+ int i;
+ size_t size;
+
+ size = nfield * (2 * sizeof(uint16_t) + sizeof(uint64_t));
+
+ if (mrp_msgbuf_write(&mb, size)) {
+ for (i = 0, f = fields; i < nfield; i++, f++) {
+ MRP_MSGBUF_PUSH(&mb, htobe16(f->tag) , 1, nomem);
+
+ v = (mrp_msg_value_t *)(data + f->offs);
+
+ switch (f->type) {
+ case MRP_MSG_FIELD_STRING:
+ len = strlen(v->str) + 1;
+ MRP_MSGBUF_PUSH(&mb, htobe32(len), 1, nomem);
+ MRP_MSGBUF_PUSH_DATA(&mb, v->str, len, 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_BOOL:
+ MRP_MSGBUF_PUSH(&mb, htobe32(v->bln ? TRUE : FALSE), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT8:
+ MRP_MSGBUF_PUSH(&mb, v->u8, 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT8:
+ MRP_MSGBUF_PUSH(&mb, v->s8, 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT16:
+ MRP_MSGBUF_PUSH(&mb, htobe16(v->u16), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT16:
+ MRP_MSGBUF_PUSH(&mb, htobe16(v->s16), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT32:
+ MRP_MSGBUF_PUSH(&mb, htobe32(v->u32), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT32:
+ MRP_MSGBUF_PUSH(&mb, htobe32(v->s32), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT64:
+ MRP_MSGBUF_PUSH(&mb, htobe64(v->u64), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT64:
+ MRP_MSGBUF_PUSH(&mb, htobe64(v->s64), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_DOUBLE:
+ MRP_MSGBUF_PUSH(&mb, v->dbl, 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_BLOB:
+ errno = EOPNOTSUPP;
+ /* intentional fall through */
+
+ default:
+ if (f->type & MRP_MSG_FIELD_ARRAY) {
+ errno = EOPNOTSUPP;
+ mrp_log_error("XXX TODO: MRP_MSG_FIELD_ARRAY "
+ "not implemented");
+ }
+ else
+ errno = EINVAL;
+
+ mrp_msgbuf_cancel(&mb);
+ nomem:
+ *bufp = NULL;
+ return 0;
+ }
+ }
+ }
+
+ *bufp = mb.buf;
+ return (size_t)(mb.p - mb.buf);
+}
+
+
+#if 0
+static mrp_data_member_t *member_type(mrp_data_member_t *fields, int nfield,
+ uint16_t tag)
+{
+ mrp_data_member_t *f;
+ int i;
+
+ for (i = 0, f = fields; i < nfield; i++, f++)
+ if (f->tag == tag)
+ return f;
+
+ return NULL;
+}
+#endif
+
+static void *mrp_msg_decode(void **bufp, size_t *sizep, size_t data_size,
+ mrp_data_member_t *fields, int nfield)
+{
+ void *data;
+ mrp_data_member_t *f;
+ mrp_msgbuf_t mb;
+ uint16_t tag;
+ mrp_msg_value_t *v;
+ void *value;
+ uint32_t len;
+ int i;
+
+ if (MRP_UNLIKELY((data = mrp_allocz(data_size)) == NULL))
+ return NULL;
+
+ mrp_msgbuf_read(&mb, *bufp, *sizep);
+
+ for (i = 0; i < nfield; i++) {
+ tag = be16toh(MRP_MSGBUF_PULL(&mb, typeof(tag) , 1, nodata));
+ f = member_type(fields, nfield, tag);
+
+ if (MRP_UNLIKELY(f == NULL))
+ goto unknown_field;
+
+ v = (mrp_msg_value_t *)(data + f->offs);
+
+ switch (f->type) {
+ case MRP_MSG_FIELD_STRING:
+ len = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len), 1, nodata));
+ value = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
+ v->str = mrp_strdup((char *)value);
+ if (v->str == NULL)
+ goto nomem;
+ break;
+
+ case MRP_MSG_FIELD_BOOL:
+ v->bln = be32toh(MRP_MSGBUF_PULL(&mb, uint32_t, 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT8:
+ v->u8 = MRP_MSGBUF_PULL(&mb, typeof(v->u8), 1, nodata);
+ break;
+
+ case MRP_MSG_FIELD_SINT8:
+ v->s8 = MRP_MSGBUF_PULL(&mb, typeof(v->s8), 1, nodata);
+ break;
+
+ case MRP_MSG_FIELD_UINT16:
+ v->u16 = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v->u16), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT16:
+ v->s16 = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v->s16), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT32:
+ v->u32 = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v->u32), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT32:
+ v->s32 = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v->s32), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_UINT64:
+ v->u64 = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v->u64), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_SINT64:
+ v->s64 = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v->s64), 1, nodata));
+ break;
+
+ case MRP_MSG_FIELD_DOUBLE:
+ v->dbl = MRP_MSGBUF_PULL(&mb, typeof(v->dbl), 1, nodata);
+ break;
+
+ case MRP_MSG_FIELD_BLOB:
+ errno = EOPNOTSUPP;
+ default:
+ if (f->type & MRP_MSG_FIELD_ARRAY) {
+ errno = EOPNOTSUPP;
+ mrp_log_error("XXX TODO: MRP_MSG_FIELD_ARRAY "
+ "not implemented");
+ }
+ else {
+ unknown_field:
+ errno = EINVAL;
+ }
+ goto fail;
+ }
+ }
+
+ *bufp = mb.buf;
+ *sizep -= mb.p - mb.buf;
+ return data;
+
+ nodata:
+ nomem:
+ fail:
+ if (data != NULL) {
+ for (i = 0, f = fields; i < nfield; i++, f++) {
+ switch (f->type) {
+ case MRP_MSG_FIELD_STRING:
+ case MRP_MSG_FIELD_BLOB:
+ mrp_free(data + f->offs);
+ }
+ }
+
+ mrp_free(data);
+ }
+
+ return NULL;
+}
#include <sys/types.h>
#include <sys/socket.h>
+#define _GNU_SOURCE
+#include <getopt.h>
+
#include <murphy/common.h>
-#define TAG_END ((uint16_t)0x0)
-#define TAG_SEQ ((uint16_t)0x1)
-#define TAG_FOO ((uint16_t)0x2)
-#define TAG_BAR ((uint16_t)0x3)
-#define TAG_MSG ((uint16_t)0x4)
-#define TAG_RPL ((uint16_t)0x5)
+
+/*
+ * tags for generic message fields
+ */
+
+#define TAG_SEQ ((uint16_t)0x1)
+#define TAG_MSG ((uint16_t)0x2)
+#define TAG_U8 ((uint16_t)0x3)
+#define TAG_S8 ((uint16_t)0x4)
+#define TAG_U16 ((uint16_t)0x5)
+#define TAG_S16 ((uint16_t)0x6)
+#define TAG_DBL ((uint16_t)0x7)
+#define TAG_BLN ((uint16_t)0x8)
+#define TAG_ASTR ((uint16_t)0x9)
+#define TAG_AU32 ((uint16_t)0xa)
+#define TAG_RPL ((uint16_t)0xb)
+#define TAG_END MRP_MSG_FIELD_END
+
+#define U32_GUARD (uint32_t)-1
+
+/*
+ * our test custom data type
+ */
+
+#define TAG_CUSTOM 0x1
+
+typedef struct {
+ uint32_t seq;
+ char *msg;
+ uint8_t u8;
+ int8_t s8;
+ uint16_t u16;
+ int16_t s16;
+ double dbl;
+ bool bln;
+ char **astr;
+ uint32_t nstr;
+ uint32_t *au32;
+ char *rpl;
+} custom_t;
+
+
+MRP_DATA_DESCRIPTOR(custom_descr, TAG_CUSTOM, custom_t,
+ MRP_DATA_MEMBER(custom_t, seq, MRP_MSG_FIELD_UINT32),
+ MRP_DATA_MEMBER(custom_t, msg, MRP_MSG_FIELD_STRING),
+ MRP_DATA_MEMBER(custom_t, u8, MRP_MSG_FIELD_UINT8 ),
+ MRP_DATA_MEMBER(custom_t, s8, MRP_MSG_FIELD_SINT8 ),
+ MRP_DATA_MEMBER(custom_t, u16, MRP_MSG_FIELD_UINT16),
+ MRP_DATA_MEMBER(custom_t, s16, MRP_MSG_FIELD_SINT16),
+ MRP_DATA_MEMBER(custom_t, dbl, MRP_MSG_FIELD_DOUBLE),
+ MRP_DATA_MEMBER(custom_t, bln, MRP_MSG_FIELD_BOOL ),
+ MRP_DATA_MEMBER(custom_t, rpl, MRP_MSG_FIELD_STRING),
+ MRP_DATA_MEMBER(custom_t, nstr, MRP_MSG_FIELD_UINT32),
+ MRP_DATA_ARRAY_COUNT(custom_t, astr, nstr,
+ MRP_MSG_FIELD_STRING),
+ MRP_DATA_ARRAY_GUARD(custom_t, au32, u32, U32_GUARD,
+ MRP_MSG_FIELD_UINT32));
typedef struct {
int sock;
mrp_io_watch_t *iow;
mrp_timer_t *timer;
+ int custom;
+ int log_mask;
+ const char *log_target;
+ uint32_t seqno;
} context_t;
+void recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data);
+void recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
+
+
void closed_evt(mrp_transport_t *t, int error, void *user_data)
{
context_t *c = (context_t *)user_data;
}
-void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+void dump_msg(mrp_msg_t *msg, FILE *fp)
+{
+ mrp_msg_dump(msg, fp);
+}
+
+
+void dump_custom(custom_t *msg, FILE *fp)
+{
+ uint32_t i;
+
+ mrp_data_dump(msg, &custom_descr, fp);
+ fprintf(fp, "{\n");
+ fprintf(fp, " seq = %u\n" , msg->seq);
+ fprintf(fp, " msg = '%s'\n", msg->msg);
+ fprintf(fp, " u8 = %u\n" , msg->u8);
+ fprintf(fp, " s8 = %d\n" , msg->s8);
+ fprintf(fp, " u16 = %u\n" , msg->u16);
+ fprintf(fp, " s16 = %d\n" , msg->s16);
+ fprintf(fp, " dbl = %f\n" , msg->dbl);
+ fprintf(fp, " bln = %s\n" , msg->bln ? "true" : "false");
+ fprintf(fp, " astr = (%u)\n", msg->nstr);
+ for (i = 0; i < msg->nstr; i++)
+ fprintf(fp, " %s\n", msg->astr[i]);
+ fprintf(fp, " au32 =\n");
+ for (i = 0; msg->au32[i] != U32_GUARD; i++)
+ fprintf(fp, " %u\n", msg->au32[i]);
+ fprintf(fp, " rpl = '%s'\n", msg->rpl);
+ fprintf(fp, "}\n");
+}
+
+
+void free_custom(custom_t *msg)
+{
+ mrp_data_free(msg, custom_descr.tag);
+}
+
+
+void recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
{
context_t *c = (context_t *)user_data;
mrp_msg_field_t *f;
char buf[256];
mrp_log_info("received a message");
- mrp_msg_dump(msg, stdout);
+ dump_msg(msg, stdout);
if (c->server) {
seq = 0;
}
+void recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data)
+{
+ context_t *c = (context_t *)user_data;
+ custom_t *msg = (custom_t *)data;
+ custom_t rpl;
+ char buf[256];
+ uint32_t au32[] = { 9, 8, 7, 6, 5, -1 };
+
+ mrp_log_info("received custom message of type 0x%x", tag);
+ dump_custom(data, stdout);
+
+ if (tag != custom_descr.tag) {
+ mrp_log_error("Tag 0x%x != our custom type (0x%x).",
+ tag, custom_descr.tag);
+ exit(1);
+ }
+
+
+ if (c->server) {
+ rpl = *msg;
+ snprintf(buf, sizeof(buf), "reply to message #%u", msg->seq);
+ rpl.rpl = buf;
+ rpl.au32 = au32;
+
+ if (mrp_transport_senddata(t, &rpl, custom_descr.tag))
+ mrp_log_info("reply successfully sent");
+ else
+ mrp_log_error("failed to send reply");
+ }
+
+ free_custom(msg);
+}
+
+
void connection_evt(mrp_transport_t *lt, void *user_data)
{
static mrp_transport_evt_t evt = {
.closed = closed_evt,
- .recv = recv_evt,
+ .recv = NULL,
.recvfrom = NULL,
};
context_t *c = (context_t *)user_data;
int flags;
+ if (c->custom)
+ evt.recvdata = recv_custom;
+ else
+ evt.recv = recv_msg;
+
flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
c->t = mrp_transport_accept(lt, &evt, c, flags);
}
+void type_init(void)
+{
+ if (!mrp_msg_register_type(&custom_descr)) {
+ mrp_log_error("Failed to register custom data type.");
+ exit(1);
+ }
+}
+
void server_init(context_t *c)
{
static mrp_transport_evt_t evt = {
mrp_sockaddr_t addr;
socklen_t addrlen;
const char *type;
+ int flags;
addrlen = mrp_transport_resolve(NULL, c->addr, &addr, sizeof(addr), &type);
if (addrlen > 0) {
- c->lt = mrp_transport_create(c->ml, type, &evt, c, 0);
+ type_init();
+
+ flags = MRP_TRANSPORT_REUSEADDR |
+ c->custom ? MRP_TRANSPORT_MODE_CUSTOM : 0;
+ c->lt = mrp_transport_create(c->ml, type, &evt, c, flags);
if (c->lt == NULL) {
mrp_log_error("Failed to create listening server transport.");
}
-void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
+
+void send_msg(context_t *c)
{
- static uint32_t seqno = 1;
-
- context_t *c = (context_t *)user_data;
mrp_msg_t *msg;
uint32_t seq;
char buf[256];
- uint32_t len;
-
- MRP_UNUSED(ml);
- MRP_UNUSED(t);
-
+ char *astr[] = { "this", "is", "an", "array", "of", "strings" };
+ uint32_t au32[] = { 1, 2, 3,
+ 1 << 16, 2 << 16, 3 << 16,
+ 1 << 24, 2 << 24, 3 << 24 };
+ uint32_t nstr = MRP_ARRAY_SIZE(astr);
+ uint32_t nu32 = MRP_ARRAY_SIZE(au32);
+
+ seq = c->seqno++;
+ snprintf(buf, sizeof(buf), "this is message #%u", (unsigned int)seq);
- seq = seqno++;
- len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
- if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
- TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
- TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
- TAG_MSG, MRP_MSG_FIELD_BLOB , len, buf,
- TAG_END)) == NULL) {
+ msg = mrp_msg_create(TAG_SEQ , MRP_MSG_FIELD_UINT32, seq,
+ TAG_MSG , MRP_MSG_FIELD_STRING, buf,
+ TAG_U8 , MRP_MSG_FIELD_UINT8 , seq & 0xf,
+ TAG_S8 , MRP_MSG_FIELD_SINT8 , -(seq & 0xf),
+ TAG_U16 , MRP_MSG_FIELD_UINT16, seq,
+ TAG_S16 , MRP_MSG_FIELD_SINT16, - seq,
+ TAG_DBL , MRP_MSG_FIELD_DOUBLE, seq / 3.0,
+ TAG_BLN , MRP_MSG_FIELD_BOOL , seq & 0x1,
+ TAG_ASTR, MRP_MSG_FIELD_ARRAY_OF(STRING), nstr, astr,
+ TAG_AU32, MRP_MSG_FIELD_ARRAY_OF(UINT32), nu32, au32,
+ TAG_END);
+
+ if (msg == NULL) {
mrp_log_error("Failed to create new message.");
exit(1);
}
}
+void send_custom(context_t *c)
+{
+ uint32_t seq = c->seqno++;
+ custom_t msg;
+ char buf[256];
+ char *astr[] = { "this", "is", "a", "test", "string", "array" };
+ uint32_t au32[] = { 1, 2, 3, 4, 5, 6, 7, -1 };
+
+ msg.seq = seq;
+ snprintf(buf, sizeof(buf), "this is message #%u", (unsigned int)seq);
+ msg.msg = buf;
+ msg.u8 = seq & 0xf;
+ msg.s8 = -(seq & 0xf);
+ msg.u16 = seq;
+ msg.s16 = - seq;
+ msg.dbl = seq / 3.0;
+ msg.bln = seq & 0x1;
+ msg.astr = astr;
+ msg.nstr = MRP_ARRAY_SIZE(astr);
+ msg.au32 = au32;
+ msg.rpl = "";
+
+ if (!mrp_transport_senddata(c->t, &msg, custom_descr.tag)) {
+ mrp_log_error("Failed to send message #%d.", msg.seq);
+ exit(1);
+ }
+ else
+ mrp_log_info("Message #%d succesfully sent.", msg.seq);
+}
+
+
+
+void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
+{
+ context_t *c = (context_t *)user_data;
+
+ MRP_UNUSED(ml);
+ MRP_UNUSED(t);
+
+ if (c->custom)
+ send_custom(c);
+ else
+ send_msg(c);
+}
+
+
void client_init(context_t *c)
{
static mrp_transport_evt_t evt = {
.closed = closed_evt,
- .recv = recv_evt,
+ .recv = NULL,
.recvfrom = NULL,
};
mrp_sockaddr_t addr;
socklen_t addrlen;
const char *type;
+ int flags;
addrlen = mrp_transport_resolve(NULL, c->addr, &addr, sizeof(addr), &type);
exit(1);
}
- c->t = mrp_transport_create(c->ml, type, &evt, c, 0);
+ type_init();
+
+ if (c->custom)
+ evt.recvdata = recv_custom;
+ else
+ evt.recv = recv_msg;
+
+ flags = c->custom ? MRP_TRANSPORT_MODE_CUSTOM : 0;
+ c->t = mrp_transport_create(c->ml, type, &evt, c, flags);
if (c->t == NULL) {
mrp_log_error("Failed to create new transport.");
exit(1);
}
-
+
if (!mrp_transport_connect(c->t, &addr, addrlen)) {
mrp_log_error("Failed to connect to %s.", c->addr);
exit(1);
}
+static void print_usage(const char *argv0, int exit_code, const char *fmt, ...)
+{
+ va_list ap;
+
+ if (fmt && *fmt) {
+ va_start(ap, fmt);
+ vprintf(fmt, ap);
+ va_end(ap);
+ }
+
+ printf("usage: %s [options] [transport-address]\n\n"
+ "The possible options are:\n"
+ " -s, --server run as test server (default)\n"
+ " -c, --custom use custom messages\n"
+ " -m, --message use generic messages (default)\n"
+ " -t, --log-target=TARGET log target to use\n"
+ " TARGET is one of stderr,stdout,syslog, or a logfile path\n"
+ " -l, --log-level=LEVELS logging level to use\n"
+ " LEVELS is a comma separated list of info, error and warning\n"
+ " -v, --verbose increase logging verbosity\n"
+ " -d, --debug enable debug messages\n"
+ " -h, --help show help on usage\n",
+ argv0);
+
+ if (exit_code < 0)
+ return;
+ else
+ exit(exit_code);
+}
+
+
+static void config_set_defaults(context_t *ctx)
+{
+ mrp_clear(ctx);
+ ctx->addr = "tcp4:127.0.0.1:3000";
+ ctx->server = FALSE;
+ ctx->custom = FALSE;
+ ctx->log_mask = MRP_LOG_UPTO(MRP_LOG_DEBUG);
+ ctx->log_target = MRP_LOG_TO_STDERR;
+}
+
+
+int parse_cmdline(context_t *ctx, int argc, char **argv)
+{
+ #define OPTIONS "scma:l:t:vdh"
+ struct option options[] = {
+ { "server" , no_argument , NULL, 's' },
+ { "address" , required_argument, NULL, 'a' },
+ { "custom" , no_argument , NULL, 'c' },
+ { "message" , no_argument , NULL, 'm' },
+ { "log-level" , required_argument, NULL, 'l' },
+ { "log-target", required_argument, NULL, 't' },
+ { "verbose" , optional_argument, NULL, 'v' },
+ { "debug" , no_argument , NULL, 'd' },
+ { "help" , no_argument , NULL, 'h' },
+ { NULL, 0, NULL, 0 }
+ };
+
+ int opt, debug;
+
+ debug = FALSE;
+ config_set_defaults(ctx);
+
+ while ((opt = getopt_long(argc, argv, OPTIONS, options, NULL)) != -1) {
+ switch (opt) {
+ case 's':
+ ctx->server = TRUE;
+ break;
+
+ case 'c':
+ ctx->custom = TRUE;
+ break;
+
+ case 'm':
+ ctx->custom = FALSE;
+ break;
+
+ case 'a':
+ ctx->addr = optarg;
+ break;
+
+ case 'v':
+ ctx->log_mask <<= 1;
+ ctx->log_mask |= 1;
+ break;
+
+ case 'l':
+ ctx->log_mask = mrp_log_parse_levels(optarg);
+ if (ctx->log_mask < 0)
+ print_usage(argv[0], EINVAL, "invalid log level '%s'", optarg);
+ break;
+
+ case 't':
+ ctx->log_target = mrp_log_parse_target(optarg);
+ if (!ctx->log_target)
+ print_usage(argv[0], EINVAL, "invalid log target '%s'", optarg);
+ break;
+
+ case 'd':
+ debug = TRUE;
+ break;
+
+ case 'h':
+ print_usage(argv[0], -1, "");
+ exit(0);
+ break;
+
+ default:
+ print_usage(argv[0], EINVAL, "invalid option '%c'", opt);
+ }
+ }
+
+ if (debug)
+ ctx->log_mask |= MRP_LOG_MASK_DEBUG;
+
+ return TRUE;
+}
+
+
int main(int argc, char *argv[])
{
context_t c;
mrp_clear(&c);
- mrp_log_set_mask(MRP_LOG_UPTO(MRP_LOG_DEBUG));
- mrp_log_set_target(MRP_LOG_TO_STDOUT);
- if (argc == 3 && (!strcmp(argv[1], "-s") || !strcmp(argv[1], "--server"))) {
- c.server = TRUE;
- c.addr = argv[2];
+ if (!parse_cmdline(&c, argc, argv))
+ exit(1);
+
+ mrp_log_set_mask(c.log_mask);
+ mrp_log_set_target(c.log_target);
+
+ if (c.server)
mrp_log_info("Running as server, using address '%s'...", c.addr);
- }
- else if (argc == 2) {
- c.addr = argv[1];
+ else
mrp_log_info("Running as client, using address '%s'...", c.addr);
- }
- else {
- mrp_log_error("invalid command line arguments");
- mrp_log_error("usage: %s [-s] address:port", argv[0]);
- exit(1);
- }
+
+ if (c.custom)
+ mrp_log_info("Using custom messages...");
+ else
+ mrp_log_info("Using generic messages...");
c.ml = mrp_mainloop_create();
#include <murphy/common.h>
-#define TAG_END ((uint16_t)0x0)
+#define TAG_END MRP_MSG_FIELD_END
#define TAG_SEQ ((uint16_t)0x1)
#define TAG_FOO ((uint16_t)0x2)
#define TAG_BAR ((uint16_t)0x3)
seq = seqno++;
- len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
+ len = snprintf(buf, sizeof(buf), "this is message #%u", (unsigned int)seq);
if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
#include <murphy/common.h>
-#define TAG_END ((uint16_t)0x0)
+#define TAG_END MRP_MSG_FIELD_END
#define TAG_SEQ ((uint16_t)0x1)
#define TAG_FOO ((uint16_t)0x2)
#define TAG_BAR ((uint16_t)0x3)
seq = seqno++;
- len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
+ len = snprintf(buf, sizeof(buf), "this is message #%u", (unsigned int)seq);
if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
#include <string.h>
+#include <errno.h>
#include <murphy/common/mm.h>
#include <murphy/common/list.h>
#include <murphy/common/log.h>
static int check_destroy(mrp_transport_t *t);
+static int recv_data(mrp_transport_t *t, void *data, size_t size,
+ mrp_sockaddr_t *addr, socklen_t addrlen);
+
static MRP_LIST_HOOK(transports);
static inline int purge_destroyed(mrp_transport_t *t);
-
int mrp_transport_register(mrp_transport_descr_t *d)
{
if (d->size >= sizeof(mrp_transport_t)) {
t->user_data = user_data;
t->check_destroy = check_destroy;
+ t->recv_data = recv_data;
+ t->flags = flags;
- if (!t->descr->req.open(t, flags)) {
+ if (!t->descr->req.open(t)) {
mrp_free(t);
t = NULL;
}
t->connected = connected;
t->check_destroy = check_destroy;
+ t->recv_data = recv_data;
+ t->flags = flags;
- if (!t->descr->req.create(t, conn, flags)) {
+ if (!t->descr->req.create(t, conn)) {
mrp_free(t);
t = NULL;
}
t->ml = lt->ml;
t->evt = *evt;
t->user_data = user_data;
-
+
t->check_destroy = check_destroy;
+ t->recv_data = recv_data;
+ t->flags = (lt->flags & MRP_TRANSPORT_INHERIT) | flags;
MRP_TRANSPORT_BUSY(t, {
- if (!t->descr->req.accept(t, lt, flags)) {
+ if (!t->descr->req.accept(t, lt)) {
mrp_free(t);
t = NULL;
}
+ else {
+
+ }
});
}
{
int result;
- if (/*!t->connected && */t->descr->req.sendto) {
+ if (t->descr->req.sendto) {
MRP_TRANSPORT_BUSY(t, {
result = t->descr->req.sendto(t, msg, addr, addrlen);
});
}
+int mrp_transport_sendraw(mrp_transport_t *t, void *data, size_t size)
+{
+ int result;
+
+ if (t->connected &&
+ (t->flags & MRP_TRANSPORT_MODE_RAW) && t->descr->req.sendraw) {
+ MRP_TRANSPORT_BUSY(t, {
+ result = t->descr->req.sendraw(t, data, size);
+ });
+
+ purge_destroyed(t);
+ }
+ else
+ result = FALSE;
+
+ return result;
+}
+
+
+int mrp_transport_sendrawto(mrp_transport_t *t, void *data, size_t size,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ int result;
+
+ if ((t->flags & MRP_TRANSPORT_MODE_RAW) && t->descr->req.sendrawto) {
+ MRP_TRANSPORT_BUSY(t, {
+ result = t->descr->req.sendrawto(t, data, size, addr, addrlen);
+ });
+
+ purge_destroyed(t);
+ }
+ else
+ result = FALSE;
+
+ return result;
+}
+
+
+int mrp_transport_senddata(mrp_transport_t *t, void *data, uint16_t tag)
+{
+ int result;
+
+ if (t->connected &&
+ (t->flags & MRP_TRANSPORT_MODE_CUSTOM) && t->descr->req.senddata) {
+ MRP_TRANSPORT_BUSY(t, {
+ result = t->descr->req.senddata(t, data, tag);
+ });
+
+ purge_destroyed(t);
+ }
+ else
+ result = FALSE;
+
+ return result;
+}
+
+
+int mrp_transport_senddatato(mrp_transport_t *t, void *data, uint16_t tag,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ int result;
+
+ if ((t->flags & MRP_TRANSPORT_MODE_CUSTOM) && t->descr->req.senddatato) {
+ MRP_TRANSPORT_BUSY(t, {
+ result = t->descr->req.senddatato(t, data, tag, addr, addrlen);
+ });
+
+ purge_destroyed(t);
+ }
+ else
+ result = FALSE;
+
+ return result;
+}
+
+
+static int recv_data(mrp_transport_t *t, void *data, size_t size,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ mrp_data_descr_t *type;
+ uint16_t tag;
+ mrp_msg_t *msg;
+ void *decoded;
+
+ if (MRP_TRANSPORT_MODE(t) == MRP_TRANSPORT_MODE_CUSTOM) {
+ tag = be16toh(*(uint16_t *)data);
+ data += sizeof(tag);
+ size -= sizeof(tag);
+ type = mrp_msg_find_type(tag);
+
+ if (type != NULL) {
+ decoded = mrp_data_decode(&data, &size, type);
+
+ if (size == 0) {
+ if (t->connected && t->evt.recvdata) {
+ MRP_TRANSPORT_BUSY(t, {
+ t->evt.recvdata(t, decoded, tag, t->user_data);
+ });
+ }
+ else if (t->evt.recvdatafrom) {
+ MRP_TRANSPORT_BUSY(t, {
+ t->evt.recvdatafrom(t, decoded, tag, addr, addrlen,
+ t->user_data);
+ });
+ }
+ else
+ mrp_free(decoded); /* no callback, discard */
+
+ return 0;
+ }
+ else {
+ mrp_free(decoded);
+ return -EMSGSIZE;
+ }
+ }
+ else
+ return -ENOPROTOOPT;
+ }
+ else {
+ if (MRP_TRANSPORT_MODE(t) == MRP_TRANSPORT_MODE_RAW) {
+ if (t->connected) {
+ MRP_TRANSPORT_BUSY(t, {
+ t->evt.recvraw(t, data, size, t->user_data);
+ });
+ }
+ else {
+ MRP_TRANSPORT_BUSY(t, {
+ t->evt.recvrawfrom(t, data, size, addr, addrlen,
+ t->user_data);
+ });
+ }
+
+ return 0;
+ }
+ else {
+ tag = be16toh(*(uint16_t *)data);
+ data += sizeof(tag);
+ size -= sizeof(tag);
+
+ if (tag != MRP_MSG_TAG_DEFAULT ||
+ (msg = mrp_msg_default_decode(data, size)) == NULL) {
+ return -EPROTO;
+ }
+ else {
+ if (t->connected) {
+ MRP_TRANSPORT_BUSY(t, {
+ t->evt.recv(t, msg, t->user_data);
+ });
+ }
+ else {
+ MRP_TRANSPORT_BUSY(t, {
+ t->evt.recvfrom(t, msg, addr, addrlen,
+ t->user_data);
+ });
+ }
+
+ mrp_msg_unref(msg);
+
+ return 0;
+ }
+ }
+ }
+}
+
MRP_TRANSPORT_REUSEADDR = 0x1,
MRP_TRANSPORT_NONBLOCK = 0x2,
MRP_TRANSPORT_CLOEXEC = 0x4,
+
+ MRP_TRANSPORT_MODE_MSG = 0x00000000, /* in generic mode */
+ MRP_TRANSPORT_MODE_RAW = 0x10000000, /* in bitpipe mode */
+ MRP_TRANSPORT_MODE_CUSTOM = 0x20000000, /* in custom type mode */
+ MRP_TRANSPORT_MODE_MASK = 0x30000000, /* mask for transport mode */
+
+ MRP_TRANSPORT_INHERIT = 0x30000000, /* mask of inherited flags */
} mrp_transport_flag_t;
+#define MRP_TRANSPORT_MODE(t) ((t)->flags & MRP_TRANSPORT_MODE_MASK)
/*
* transport requests
typedef struct {
/** Open a new transport. */
- int (*open)(mrp_transport_t *t, int flags);
+ int (*open)(mrp_transport_t *t);
/** Create a new transport from an existing backend object. */
- int (*create)(mrp_transport_t *t, void *obj, int flags);
+ int (*create)(mrp_transport_t *t, void *obj);
/** Bind a transport to a given transport-specific address. */
int (*bind)(mrp_transport_t *t, mrp_sockaddr_t *addr, socklen_t addrlen);
/** Listen on a transport for incoming connections. */
int (*listen)(mrp_transport_t *t, int backlog);
/** Accept a new transport connection over an existing transport. */
- int (*accept)(mrp_transport_t *t, mrp_transport_t *lt, int flags);
+ int (*accept)(mrp_transport_t *t, mrp_transport_t *lt);
/** Connect a transport to an endpoint. */
int (*connect)(mrp_transport_t *t, mrp_sockaddr_t *addr,
socklen_t addrlen);
/** Close a transport, free all resources from open/accept/connect. */
void (*close)(mrp_transport_t *t);
/** Send a message over a (connected) transport. */
- int (*send)(mrp_transport_t *t, mrp_msg_t *msg);
- /** Send a message over an unconnected transport to an address. */
- int (*sendto)(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
- socklen_t addrlen);
+ int (*send)(mrp_transport_t *t, mrp_msg_t *msg);
+ /** Send raw data over a (connected) transport. */
+ int (*sendraw)(mrp_transport_t *t, void *buf, size_t size);
+ /** Send custom data over a (connected) transport. */
+ int (*senddata)(mrp_transport_t *t, void *data, uint16_t tag);
+
+ /** Send a message over a(n unconnected) transport. */
+ int (*sendto)(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
+ socklen_t addrlen);
+ /** Send raw data over a(n unconnected) transport. */
+ int (*sendrawto)(mrp_transport_t *t, void *buf, size_t size,
+ mrp_sockaddr_t *addr, socklen_t addrlen);
+ /** Send custom data over a(n unconnected) transport. */
+ int (*senddatato)(mrp_transport_t *t, void *data, uint16_t tag,
+ mrp_sockaddr_t *addr, socklen_t addrlen);
} mrp_transport_req_t;
typedef struct {
/** Message received on a connected transport. */
- void (*recv)(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
+ union {
+ /** Generic message callback for connected transports. */
+ void (*recv)(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
+ /** Raw data callback for connected transports. */
+ void (*recvraw)(mrp_transport_t *t, void *data, size_t size,
+ void *user_data);
+ /** Custom data callback for connected transports. */
+ void (*recvdata)(mrp_transport_t *t, void *data, uint16_t tag,
+ void *user_data);
+ };
+
/** Message received on an unconnected transport. */
- void (*recvfrom)(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
- socklen_t addrlen, void *user_data);
+ union {
+ /** Generic message callback for unconnected transports. */
+ void (*recvfrom)(mrp_transport_t *t, mrp_msg_t *msg,
+ mrp_sockaddr_t *addr, socklen_t addrlen,
+ void *user_data);
+ /** Raw data callback for unconnected transports. */
+ void (*recvrawfrom)(mrp_transport_t *t, void *data, size_t size,
+ mrp_sockaddr_t *addr, socklen_t addrlen,
+ void *user_data);
+ /** Custom data callback for unconnected transports. */
+ void (*recvdatafrom)(mrp_transport_t *t, void *data, uint16_t tag,
+ mrp_sockaddr_t *addr, socklen_t addrlen,
+ void *user_data);
+ };
/** Connection closed by peer. */
void (*closed)(mrp_transport_t *t, int error, void *user_data);
/** Connection attempt on a socket being listened on. */
mrp_transport_descr_t *descr; \
mrp_transport_evt_t evt; \
int (*check_destroy)(mrp_transport_t *t); \
+ int (*recv_data)(mrp_transport_t *t, void *data, \
+ size_t size, \
+ mrp_sockaddr_t *addr, \
+ socklen_t addrlen); \
void *user_data; \
+ int flags; \
int busy; \
int connected : 1; \
int listened : 1; \
_open, _create, _close, \
_bind, _listen, _accept, \
_connect, _disconnect, \
- _send, _sendto) \
+ _send, _sendto, \
+ _sendraw, _sendrawto, \
+ _senddata, _senddatato) \
static void _prfx##_register_transport(void) \
__attribute__((constructor)); \
\
.disconnect = _disconnect, \
.send = _send, \
.sendto = _sendto, \
+ .sendraw = _sendraw, \
+ .sendrawto = _sendrawto, \
+ .senddata = _senddata, \
+ .senddatato = _senddatato, \
}, \
}; \
\
int mrp_transport_sendto(mrp_transport_t *t, mrp_msg_t *msg,
mrp_sockaddr_t *addr, socklen_t addrlen);
+/** Send raw data through the given (connected) transport. */
+int mrp_transport_sendraw(mrp_transport_t *t, void *data, size_t size);
+
+/** Send raw data through the given transport to the remote address. */
+int mrp_transport_sendrawto(mrp_transport_t *t, void *data, size_t size,
+ mrp_sockaddr_t *addr, socklen_t addrlen);
+
+/** Send custom data through the given (connected) transport. */
+int mrp_transport_senddata(mrp_transport_t *t, void *data, uint16_t tag);
+
+/** Send custom data through the given transport to the remote address. */
+int mrp_transport_senddatato(mrp_transport_t *t, void *data, uint16_t tag,
+ mrp_sockaddr_t *addr, socklen_t addrlen);
#endif /* __MURPHY_TRANSPORT_H__ */
tag = MRP_CONSOLE_INPUT;
type = MRP_MSG_FIELD_BLOB;
- msg = mrp_msg_create(tag, type, len, input, MRP_MSG_FIELD_INVALID);
+ msg = mrp_msg_create(tag, type, len, input, NULL);
if (msg != NULL) {
mrp_transport_send(c->t, msg);
tag = MRP_CONSOLE_OUTPUT;
type = MRP_MSG_FIELD_BLOB;
len = size;
- msg = mrp_msg_create(tag, type, len, buf, MRP_MSG_FIELD_INVALID);
+ msg = mrp_msg_create(tag, type, len, buf, NULL);
if (msg != NULL) {
mrp_transport_send(c->t, msg);
tag = MRP_CONSOLE_BYE;
type = MRP_MSG_FIELD_BOOL;
- msg = mrp_msg_create(tag, type, TRUE, MRP_MSG_FIELD_INVALID);
+ msg = mrp_msg_create(tag, type, TRUE, NULL);
if (msg != NULL) {
mrp_transport_send(c->t, msg);
tag = MRP_CONSOLE_PROMPT;
type = MRP_MSG_FIELD_STRING;
- msg = mrp_msg_create(tag, type, prompt, MRP_MSG_FIELD_INVALID);
+ msg = mrp_msg_create(tag, type, prompt, NULL);
if (msg != NULL) {
mrp_transport_send(c->t, msg);