#include <murphy/common/msg.h>
#include <murphy/common/transport.h>
-#define DEFAULT_SIZE 1024 /* default input buffer size */
+#define DEFAULT_SIZE 1024 /* default input buffer size */
typedef struct {
MRP_TRANSPORT_PUBLIC_FIELDS; /* common transport fields */
#include <string.h>
#include <errno.h>
#include <stdarg.h>
+#include <ctype.h>
#include <arpa/inet.h>
#include <murphy/common/macros.h>
#include <murphy/common/list.h>
#include <murphy/common/msg.h>
-#define AVG_SPACE_PER_FIELD 32 /* guesstimate for tag + data */
-#define len_t uint32_t
-#define MSG_ALIGN sizeof(len_t)
-static inline int msg_add(mrp_msg_t *msg, const char *tag, void *data,
- size_t size, int prepend);
-static void msg_destroy(mrp_msg_t *msg);
-
-mrp_msg_t *mrp_msg_create(const char *tag, ...)
+static inline mrp_msg_field_t *create_field(uint16_t tag, va_list *ap)
{
- mrp_msg_t *msg;
- va_list ap;
- void *data;
- size_t size;
-
- va_start(ap, tag);
- if ((msg = mrp_allocz(sizeof(*msg))) != NULL) {
- mrp_list_init(&msg->fields);
+ mrp_msg_field_t *f;
+ uint16_t type;
+ uint32_t size;
+ void *blb;
+
+ if ((f = mrp_allocz(sizeof(*f))) != NULL) {
+ mrp_list_init(&f->hook);
+ type = va_arg(*ap, uint32_t);
- while (tag != NULL) {
- data = va_arg(ap, typeof(data));
- size = va_arg(ap, typeof(size));
+#define CREATE(_f, _tag, _type, _fldtype, _fld, _last, _errlbl) do { \
+ (_f) = mrp_allocz(MRP_OFFSET(typeof(*_f), _last) + \
+ sizeof(_f->_last)); \
+ \
+ if ((_f) != NULL) { \
+ (_f)->tag = _tag; \
+ (_f)->type = _type; \
+ (_f)->_fld = va_arg(*ap, _fldtype); \
+ } \
+ else { \
+ goto _errlbl; \
+ } \
+ } while (0)
+
+ switch (type) {
+ case MRP_MSG_FIELD_STRING:
+ CREATE(f, tag, type, char *, str, str, nomem);
+ f->str = mrp_strdup(f->str);
+ if (f->str == NULL)
+ goto nomem;
+ break;
+ case MRP_MSG_FIELD_BOOL:
+ CREATE(f, tag, type, int, bln, bln, nomem);
+ break;
+ case MRP_MSG_FIELD_UINT8:
+ CREATE(f, tag, type, unsigned int, u8, u8, nomem);
+ break;
+ case MRP_MSG_FIELD_SINT8:
+ CREATE(f, tag, type, signed int, s8, s8, nomem);
+ break;
+ case MRP_MSG_FIELD_UINT16:
+ CREATE(f, tag, type, unsigned int, u16, u16, nomem);
+ break;
+ case MRP_MSG_FIELD_SINT16:
+ CREATE(f, tag, type, signed int, s16, s16, nomem);
+ break;
+ case MRP_MSG_FIELD_UINT32:
+ CREATE(f, tag, type, unsigned int, u32, u32, nomem);
+ break;
+ case MRP_MSG_FIELD_SINT32:
+ CREATE(f, tag, type, signed int, s32, s32, nomem);
+ break;
+ case MRP_MSG_FIELD_UINT64:
+ CREATE(f, tag, type, uint64_t, u64, u64, nomem);
+ break;
+ case MRP_MSG_FIELD_SINT64:
+ CREATE(f, tag, type, int64_t, s64, s64, nomem);
+ break;
+ case MRP_MSG_FIELD_DOUBLE:
+ CREATE(f, tag, type, double, dbl, dbl, nomem);
+ break;
+
+ case MRP_MSG_FIELD_BLOB:
+ size = va_arg(ap, uint32_t);
+ CREATE(f, tag, type, void *, blb, size[0], nomem);
+
+ blb = f->blb;
+ f->size[0] = size;
+ f->blb = mrp_allocz(size);
+
+ if (f->blb != NULL) {
+ memcpy(f->blb, blb, size);
+ f->size[0] = size;
+ }
+ else
+ goto nomem;
+ break;
- if (!msg_add(msg, tag, data, size, FALSE)) {
- msg_destroy(msg);
- msg = NULL;
-
- goto out;
+ default:
+ if (f->type & MRP_MSG_FIELD_ARRAY) {
+ errno = EOPNOTSUPP;
+ mrp_log_error("XXX TODO: MRP_MSG_FIELD_ARRAY not implemented");
}
-
- tag = va_arg(ap, typeof(tag));
+ else
+ errno = EINVAL;
+
+ mrp_free(f);
+ f = NULL;
}
+#undef CREATE
}
-
- msg->refcnt = 1;
-
- out:
- va_end(ap);
- return msg;
+ return f;
+
+ nomem:
+ errno = ENOMEM;
+ return NULL;
}
mrp_msg_field_t *f;
if (msg != NULL) {
-#ifdef __MSG_EXTRA_CHECKS__
- if (MRP_UNLIKELY(msg->refcnt) != 0) {
- mrp_log_error("%s() called for message (%p) with refcnt %d...",
- __FUNCTION__, msg, msg->refcnt);
- }
-#endif
-
- mrp_debug("destroying message %p...", msg);
-
mrp_list_foreach(&msg->fields, p, n) {
f = mrp_list_entry(p, typeof(*f), hook);
- mrp_list_delete(p);
+ mrp_list_delete(&f->hook);
+
+ switch (f->type) {
+ case MRP_MSG_FIELD_STRING:
+ mrp_free(f->str);
+ break;
+ case MRP_MSG_FIELD_BLOB:
+ mrp_free(f->blb);
+ break;
+ }
- mrp_free(f->tag);
- mrp_free(f->data);
mrp_free(f);
}
+ }
+}
+
+
+mrp_msg_t *mrp_msg_create(uint16_t tag, ...)
+{
+ mrp_msg_t *msg;
+ mrp_msg_field_t *f;
+ va_list ap;
+
+ va_start(ap, tag);
+ if ((msg = mrp_allocz(sizeof(*msg))) != NULL) {
+ mrp_list_init(&msg->fields);
+ msg->refcnt = 1;
- mrp_free(msg);
+ while (tag != MRP_MSG_FIELD_INVALID) {
+ f = create_field(tag, &ap);
+
+ if (f != NULL) {
+ mrp_list_append(&msg->fields, &f->hook);
+ msg->nfield++;
+ }
+ else {
+ msg_destroy(msg);
+ msg = NULL;
+ goto out;
+ }
+ tag = va_arg(ap, uint32_t);
+ }
}
+ out:
+ va_end(ap);
+ return msg;
}
{
if (msg != NULL) {
msg->refcnt--;
-
+
if (msg->refcnt <= 0)
msg_destroy(msg);
}
}
-static inline int msg_add(mrp_msg_t *msg, const char *tag, void *data,
- size_t size, int prepend)
+int mrp_msg_append(mrp_msg_t *msg, uint16_t tag, ...)
{
mrp_msg_field_t *f;
+ va_list ap;
- if ((f = mrp_allocz(sizeof(*f))) != NULL) {
- f->tag = mrp_strdup(tag);
- f->data = mrp_datadup(data, size);
- f->size = size;
+ va_start(ap, tag);
+ f = create_field(tag, &ap);
+ va_end(ap);
- if (f->tag != NULL && f->data != NULL) {
- mrp_list_init(&f->hook);
- if (!prepend)
- mrp_list_append(&msg->fields, &f->hook);
- else
- mrp_list_prepend(&msg->fields, &f->hook);
- msg->nfield++;
-
- return TRUE;
- }
- else {
- mrp_free(f->tag);
- mrp_free(f->data);
- mrp_free(f);
- }
+ if (f != NULL) {
+ mrp_list_append(&msg->fields, &f->hook);
+ msg->nfield++;
+ return TRUE;
}
-
- return FALSE;
+ else
+ return FALSE;
}
-int mrp_msg_append(mrp_msg_t *msg, char *tag, void *data, size_t size)
+int mrp_msg_prepend(mrp_msg_t *msg, uint16_t tag, ...)
{
- return msg_add(msg, tag, data, size, FALSE);
-}
+ mrp_msg_field_t *f;
+ va_list ap;
+ va_start(ap, tag);
+ f = create_field(tag, &ap);
+ va_end(ap);
-int mrp_msg_prepend(mrp_msg_t *msg, char *tag, void *data, size_t size)
-{
- return msg_add(msg, tag, data, size, TRUE);
+ if (f != NULL) {
+ mrp_list_prepend(&msg->fields, &f->hook);
+ msg->nfield++;
+ return TRUE;
+ }
+ else
+ return FALSE;
}
-void *mrp_msg_find(mrp_msg_t *msg, char *tag, size_t *size)
+mrp_msg_field_t *mrp_msg_find(mrp_msg_t *msg, uint16_t tag)
{
mrp_msg_field_t *f;
mrp_list_hook_t *p, *n;
mrp_list_foreach(&msg->fields, p, n) {
f = mrp_list_entry(p, typeof(*f), hook);
-
- if (!strcmp(f->tag, tag)) {
- *size = f->size;
- return f->data;
- }
+ if (f->tag == tag)
+ return f;
}
- *size = 0;
return NULL;
}
+
int mrp_msg_dump(mrp_msg_t *msg, FILE *fp)
{
mrp_msg_field_t *f;
mrp_list_hook_t *p, *n;
int l;
-
+
l = fprintf(fp, "{\n");
mrp_list_foreach(&msg->fields, p, n) {
- f = mrp_list_entry(p, typeof(*f), hook);
- l += fprintf(fp, " %s='%.*s' (%zd bytes)\n",
- f->tag, (int)f->size, (char *)f->data, f->size);
+ f = mrp_list_entry(p, typeof(*f), hook);
+
+ l += fprintf(fp, " 0x%x ", f->tag);
+
+#define DUMP(_fmt, _type, _val) \
+ l += fprintf(fp, "= <%s> "_fmt"\n", _type, _val)
+
+ switch (f->type) {
+ case MRP_MSG_FIELD_STRING:
+ DUMP("'%s'", "string", f->str);
+ break;
+ case MRP_MSG_FIELD_BOOL:
+ DUMP("%s", "boolean", f->bln ? "true" : "false");
+ break;
+ case MRP_MSG_FIELD_UINT8:
+ DUMP("%u", "uint8", f->u8);
+ break;
+ case MRP_MSG_FIELD_SINT8:
+ DUMP("%d", "sint8", f->s8);
+ break;
+ case MRP_MSG_FIELD_UINT16:
+ DUMP("%u", "uint16", f->u16);
+ break;
+ case MRP_MSG_FIELD_SINT16:
+ DUMP("%d", "sint16", f->s16);
+ break;
+ case MRP_MSG_FIELD_UINT32:
+ DUMP("%u", "uint32", f->u32);
+ break;
+ case MRP_MSG_FIELD_SINT32:
+ DUMP("%d", "sint32", f->s32);
+ break;
+ case MRP_MSG_FIELD_UINT64:
+ DUMP("%Lu", "uint64", (long long unsigned)f->u64);
+ break;
+ case MRP_MSG_FIELD_SINT64:
+ DUMP("%Ld", "sint64", (long long signed)f->s64);
+ break;
+ case MRP_MSG_FIELD_DOUBLE:
+ DUMP("%f", "double", f->dbl);
+ break;
+ case MRP_MSG_FIELD_BLOB: {
+ char *p;
+ uint32_t i;
+
+ fprintf(fp, "= <%s> <%u bytes, ", "blob", f->size[0]);
+
+ for (i = 0, p = f->blb; i < f->size[0]; i++, p++) {
+ if (isprint(*p) && *p != '\n' && *p != '\t' && *p != '\r')
+ fprintf(fp, "%c", *p);
+ else
+ fprintf(fp, ".");
+ }
+ fprintf(fp, ">\n");
+ }
+ break;
+
+ default:
+ fprintf(fp, "= <%s> {%u items, XXX TODO}\n", "array", f->size[0]);
+ }
}
l += fprintf(fp, "}\n");
}
+#define MSG_MIN_CHUNK 32
+
ssize_t mrp_msg_default_encode(mrp_msg_t *msg, void **bufp)
{
-#define ENSURE_SPACE(needed) do { \
- int _miss = needed - l; \
- if (MRP_UNLIKELY(_miss > 0)) { \
- p -= (ptrdiff_t)buf; \
- size += _miss * 2; \
- if (mrp_realloc(buf, size) == NULL) { \
- mrp_free(buf); \
- *bufp = NULL; \
- return -1; \
- } \
- else \
- p += (ptrdiff_t)buf; \
- } \
- } while (0)
+#define RESERVE(type) ({ \
+ void *_ptr; \
+ \
+ _ptr = mrp_msgbuf_reserve(&mb, sizeof(type), 1); \
+ \
+ if (_ptr == NULL) { \
+ *bufp = NULL; \
+ return -1; \
+ } \
+ \
+ _ptr; \
+ })
+
+#define RESERVE_SIZE(size) ({ \
+ void *_ptr; \
+ \
+ _ptr = mrp_msgbuf_reserve(&mb, size, 1); \
+ \
+ if (_ptr == NULL) { \
+ *bufp = NULL; \
+ return -1; \
+ } \
+ \
+ _ptr; \
+ })
+
mrp_msg_field_t *f;
- mrp_list_hook_t *pf, *nf;
- void *buf, *p;
- len_t tsize, *sizep;
- size_t size, nfield, pad, extra, fsize, tpad, dpad;
- int l;
+ mrp_list_hook_t *p, *n;
+ mrp_msgbuf_t mb;
+ uint32_t len;
+ size_t size;
+
+ size = msg->nfield * (2 * sizeof(uint16_t) + sizeof(uint64_t));
+
+ if (mrp_msgbuf_write(&mb, size)) {
+ MRP_MSGBUF_PUSH(&mb, htobe16(msg->nfield), 1, nomem);
- nfield = msg->nfield;
- extra = nfield * 2 * sizeof(uint32_t);
- pad = nfield * 2 * 3;
- size = sizeof(len_t) + nfield * AVG_SPACE_PER_FIELD + extra + pad;
+ mrp_list_foreach(&msg->fields, p, n) {
+ f = mrp_list_entry(p, typeof(*f), hook);
+
+ MRP_MSGBUF_PUSH(&mb, htobe16(f->tag) , 1, nomem);
+ MRP_MSGBUF_PUSH(&mb, htobe16(f->type), 1, nomem);
+
+ switch (f->type) {
+ case MRP_MSG_FIELD_STRING:
+ len = strlen(f->str) + 1;
+ MRP_MSGBUF_PUSH(&mb, htobe32(len), 1, nomem);
+ MRP_MSGBUF_PUSH_DATA(&mb, f->str, len, 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_BOOL:
+ MRP_MSGBUF_PUSH(&mb, htobe32(f->bln ? TRUE : FALSE), 1, nomem);
+ break;
- if ((buf = mrp_alloc(size)) != NULL) {
- p = buf;
- l = size;
-
- /* encode number of fields */
- sizep = p;
- *sizep = htonl(msg->nfield);
- p += sizeof(*sizep);
- l -= sizeof(*sizep);
-
- mrp_list_foreach(&msg->fields, pf, nf) {
- f = mrp_list_entry(pf, typeof(*f), hook);
+ case MRP_MSG_FIELD_UINT8:
+ MRP_MSGBUF_PUSH(&mb, f->u8, 1, nomem);
+ break;
- /* make space for field if needed */
- tsize = strlen(f->tag) + 1;
- tpad = (MSG_ALIGN - (tsize & (MSG_ALIGN-1))) & (MSG_ALIGN-1);
- dpad = (MSG_ALIGN - (f->size & (MSG_ALIGN-1))) & (MSG_ALIGN-1);
-
- fsize = sizeof(*sizep) + tsize + tpad;
- fsize += sizeof(*sizep) + f->size + dpad;
- ENSURE_SPACE(fsize);
-
- /* tag size and tag */
- sizep = p;
- *sizep = htonl(tsize + tpad);
- p += sizeof(*sizep);
- l -= sizeof(*sizep);
- memcpy(p, f->tag, tsize);
- memset(p + tsize, 0, tpad);
- p += tsize + tpad;
- l -= tsize + tpad;
-
- /* data size and data */
- sizep = p;
- *sizep = htonl(f->size + dpad);
- p += sizeof(*sizep);
- l -= sizeof(*sizep);
- memcpy(p, f->data, f->size);
- memset(p + f->size, 0, dpad);
- p += f->size + dpad;
- l -= f->size + dpad;
+ case MRP_MSG_FIELD_SINT8:
+ MRP_MSGBUF_PUSH(&mb, f->s8, 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT16:
+ MRP_MSGBUF_PUSH(&mb, htobe16(f->u16), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT16:
+ MRP_MSGBUF_PUSH(&mb, htobe16(f->s16), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT32:
+ MRP_MSGBUF_PUSH(&mb, htobe32(f->u32), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT32:
+ MRP_MSGBUF_PUSH(&mb, htobe32(f->s32), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_UINT64:
+ MRP_MSGBUF_PUSH(&mb, htobe64(f->u64), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_SINT64:
+ MRP_MSGBUF_PUSH(&mb, htobe64(f->s64), 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_DOUBLE:
+ MRP_MSGBUF_PUSH(&mb, f->dbl, 1, nomem);
+ break;
+
+ case MRP_MSG_FIELD_BLOB:
+ len = f->size[0];
+ MRP_MSGBUF_PUSH(&mb, htobe32(len), 1, nomem);
+ MRP_MSGBUF_PUSH_DATA(&mb, f->blb, len, 1, nomem);
+ break;
+
+ default:
+ if (f->type & MRP_MSG_FIELD_ARRAY) {
+ errno = EOPNOTSUPP;
+ mrp_log_error("XXX TODO: MRP_MSG_FIELD_ARRAY "
+ "not implemented");
+ }
+ else
+ errno = EINVAL;
+
+ mrp_msgbuf_cancel(&mb);
+ nomem:
+ *bufp = NULL;
+ return -1;
+ }
}
+ }
- size = p - buf;
- *bufp = buf;
+ *bufp = mb.buf;
+ return mb.p - mb.buf;
+}
+
+
+mrp_msg_t *mrp_msg_default_decode(void *buf, size_t size)
+{
+#define PULL(type) ({ \
+ void *_ptr; \
+ \
+ _ptr = mrp_msgbuf_pull(&mb, sizeof(type), 1); \
+ \
+ if (_ptr == NULL) \
+ return NULL; \
+ \
+ _ptr; \
+ })
+
+#define PULL_SIZE(size) ({ \
+ void *_ptr; \
+ \
+ _ptr = mrp_msgbuf_pull(&mb, size, 1); \
+ \
+ if (_ptr == NULL) \
+ return NULL; \
+ \
+ _ptr; \
+ })
+
+ mrp_msg_t *msg;
+ mrp_msgbuf_t mb;
+ mrp_msg_value_t v;
+ void *value;
+ uint16_t nfield, tag, type;
+ uint32_t len;
+ int i;
+
+ msg = mrp_msg_create(MRP_MSG_FIELD_INVALID);
+
+ if (msg == NULL)
+ return NULL;
+
+ mrp_msgbuf_read(&mb, buf, size);
+
+ nfield = be16toh(MRP_MSGBUF_PULL(&mb, typeof(nfield), 1, nodata));
+
+ for (i = 0; i < nfield; i++) {
+ tag = be16toh(MRP_MSGBUF_PULL(&mb, typeof(tag) , 1, nodata));
+ type = be16toh(MRP_MSGBUF_PULL(&mb, typeof(type), 1, nodata));
+
+ switch (type) {
+ case MRP_MSG_FIELD_STRING:
+ len = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len), 1, nodata));
+ value = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
+ if (!mrp_msg_append(msg, tag, type, value, MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ case MRP_MSG_FIELD_BOOL:
+ v.bln = be32toh(MRP_MSGBUF_PULL(&mb, uint32_t, 1, nodata));
+ if (!mrp_msg_append(msg, tag, type, v.bln, MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ case MRP_MSG_FIELD_UINT8:
+ v.u8 = MRP_MSGBUF_PULL(&mb, typeof(v.u8), 1, nodata);
+ if (!mrp_msg_append(msg, tag, type, v.u8, MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ case MRP_MSG_FIELD_SINT8:
+ v.s8 = MRP_MSGBUF_PULL(&mb, typeof(v.s8), 1, nodata);
+ if (!mrp_msg_append(msg, tag, type, v.s8, MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ case MRP_MSG_FIELD_UINT16:
+ v.u16 = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v.u16), 1, nodata));
+ if (!mrp_msg_append(msg, tag, type, v.u16, MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ case MRP_MSG_FIELD_SINT16:
+ v.s16 = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v.s16), 1, nodata));
+ if (!mrp_msg_append(msg, tag, type, v.s16, MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ case MRP_MSG_FIELD_UINT32:
+ v.u32 = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v.u32), 1, nodata));
+ if (!mrp_msg_append(msg, tag, type, v.u32, MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ case MRP_MSG_FIELD_SINT32:
+ v.s32 = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v.s32), 1, nodata));
+ if (!mrp_msg_append(msg, tag, type, v.s32, MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ case MRP_MSG_FIELD_UINT64:
+ v.u64 = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v.u64), 1, nodata));
+ if (!mrp_msg_append(msg, tag, type, v.u64, MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ case MRP_MSG_FIELD_SINT64:
+ v.s64 = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v.s64), 1, nodata));
+ if (!mrp_msg_append(msg, tag, type, v.s64, MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ case MRP_MSG_FIELD_DOUBLE:
+ v.dbl = MRP_MSGBUF_PULL(&mb, typeof(v.dbl), 1, nodata);
+ if (!mrp_msg_append(msg, tag, type, v.dbl, MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ case MRP_MSG_FIELD_BLOB:
+ len = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len), 1, nodata));
+ value = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
+ if (!mrp_msg_append(msg, tag, type, len, value,
+ MRP_MSG_FIELD_INVALID))
+ goto fail;
+ break;
+
+ default:
+ if (type & MRP_MSG_FIELD_ARRAY) {
+ errno = EOPNOTSUPP;
+ mrp_log_error("XXX TODO: MRP_MSG_FIELD_ARRAY "
+ "not implemented");
+ }
+ else
+ errno = EINVAL;
+ goto fail;
+ }
}
- else {
- *bufp = NULL;
- size = -1;
+
+ return msg;
+
+
+ fail:
+ nodata:
+ mrp_msg_unref(msg);
+ return NULL;
+}
+
+
+void *mrp_msgbuf_write(mrp_msgbuf_t *mb, size_t size)
+{
+ mrp_clear(mb);
+
+ mb->buf = mrp_allocz(size);
+
+ if (mb->buf != NULL) {
+ mb->size = size;
+ mb->p = mb->buf;
+ mb->l = size;
+
+ return mb->p;
}
+ else
+ return NULL;
+}
- return size;
-#undef ENSURE_SPACE
+void mrp_msgbuf_read(mrp_msgbuf_t *mb, void *buf, size_t size)
+{
+ mb->buf = mb->p = buf;
+ mb->size = mb->l = size;
}
-mrp_msg_t *mrp_msg_default_decode(void *buf, size_t size)
+void mrp_msgbuf_cancel(mrp_msgbuf_t *mb)
+{
+ mrp_free(mb->buf);
+ mb->buf = mb->p = NULL;
+}
+
+
+void *mrp_msgbuf_ensure(mrp_msgbuf_t *mb, size_t size)
{
-#define ENSURE_DATA(n) do { \
- if (MRP_UNLIKELY((int)(n) > (int)l)) { \
- msg_destroy(msg); \
- return NULL; \
- } \
- } while (0)
+ int diff;
- mrp_msg_t *msg;
- len_t *sizep;
- int nfield, l, i;
- char *tag;
- void *p, *data;
- size_t n;
-
- if ((msg = mrp_msg_create(NULL)) != NULL) {
- p = buf;
- l = size;
+ if (MRP_UNLIKELY(size > mb->l)) {
+ diff = size - mb->l;
- /* get number of fields */
- ENSURE_DATA(sizeof(*sizep));
- sizep = p;
- nfield = ntohl(*sizep);
- p += sizeof(*sizep);
- l -= sizeof(*sizep);
-
- /* decode fields */
- for (i = 0; i < nfield; i++) {
- /* get tag size and tag */
- ENSURE_DATA(sizeof(*sizep));
- sizep = p;
- p += sizeof(*sizep);
- l -= sizeof(*sizep);
- n = ntohl(*sizep);
- ENSURE_DATA(MRP_ALIGN(n, MSG_ALIGN));
- tag = p;
- /* get data size and data */
- p += MRP_ALIGN(n, MSG_ALIGN);
- l -= MRP_ALIGN(n, MSG_ALIGN);
- ENSURE_DATA(sizeof(*sizep));
- sizep = p;
- p += sizeof(*sizep);
- l -= sizeof(*sizep);
- n = ntohl(*sizep);
- ENSURE_DATA(MRP_ALIGN(n, MSG_ALIGN));
- data = p;
-
- if (!mrp_msg_append(msg, tag, data, n)) {
- msg_destroy(msg);
+ if (diff < MSG_MIN_CHUNK)
+ diff = MSG_MIN_CHUNK;
+
+ mb->p -= (ptrdiff_t)mb->buf;
+
+ if (mrp_realloc(mb->buf, mb->size + diff)) {
+ memset(mb->buf + mb->size, 0, diff);
+ mb->size += diff;
+ mb->p += (ptrdiff_t)mb->buf;
+ mb->l += diff;
+ }
+ else
+ mrp_msgbuf_cancel(mb);
+ }
- return NULL;
- }
-
+ return mb->p;
+}
- p += MRP_ALIGN(n, MSG_ALIGN);
- l -= MRP_ALIGN(n, MSG_ALIGN);
- }
- if (MRP_UNLIKELY(l != 0)) {
- msg_destroy(msg);
- msg = NULL;
- }
+void *mrp_msgbuf_reserve(mrp_msgbuf_t *mb, size_t size, size_t align)
+{
+ void *reserved;
+ ptrdiff_t offs, pad;
+ size_t len;
+
+ len = size;
+ offs = mb->p - mb->buf;
+
+ if (offs % align != 0) {
+ pad = align - (offs % align);
+ len += pad;
}
+ else
+ pad = 0;
- return msg;
+ if (mrp_msgbuf_ensure(mb, len)) {
+ if (pad != 0)
+ memset(mb->p, 0, pad);
+
+ reserved = mb->p + pad;
+
+ mb->p += len;
+ mb->l -= len;
+ }
+ else
+ reserved = NULL;
+
+ return reserved;
+}
+
+
+void *mrp_msgbuf_pull(mrp_msgbuf_t *mb, size_t size, size_t align)
+{
+ void *pulled;
+ ptrdiff_t offs, pad;
+ size_t len;
+
+ len = size;
+ offs = mb->p - mb->buf;
+
+ if (offs % align != 0) {
+ pad = align - (offs % align);
+ len += pad;
+ }
+ else
+ pad = 0;
+
+ if (mb->l >= len) {
+ pulled = mb->p + pad;
+
+ mb->p += len;
+ mb->l -= len;
+ }
+ else
+ pulled = NULL;
-#undef ENSURE_DATA
+ return pulled;
}
#define __MURPHY_MSG_H__
#include <stdio.h>
+#include <stdbool.h>
+#include <stdarg.h>
+
#include <murphy/common/list.h>
+#include <murphy/common/refcnt.h>
+
+
+/*
+ * message field types
+ */
+
+#define A(t) MRP_MSG_FIELD_##t
+typedef enum {
+ MRP_MSG_FIELD_INVALID = 0x00, /* defined invalid type */
+ MRP_MSG_FIELD_STRING = 0x01, /* string */
+ MRP_MSG_FIELD_BOOL = 0x02, /* boolean */
+ MRP_MSG_FIELD_UINT8 = 0x03, /* unsigned 8-bit integer */
+ MRP_MSG_FIELD_SINT8 = 0x04, /* signed 8-bit integer */
+ MRP_MSG_FIELD_INT8 = A(SINT8), /* alias for SINT8 */
+ MRP_MSG_FIELD_UINT16 = 0x05, /* unsigned 16-bit integer */
+ MRP_MSG_FIELD_SINT16 = 0x06, /* signed 16-bit integer */
+ MRP_MSG_FIELD_INT16 = A(SINT16), /* alias for SINT16 */
+ MRP_MSG_FIELD_UINT32 = 0x07, /* unsigned 32-bit integer */
+ MRP_MSG_FIELD_SINT32 = 0x08, /* signed 32-bit integer */
+ MRP_MSG_FIELD_INT32 = A(SINT32), /* alias for SINT32 */
+ MRP_MSG_FIELD_UINT64 = 0x09, /* unsigned 64-bit integer */
+ MRP_MSG_FIELD_SINT64 = 0x0a, /* signed 64-bit integer */
+ MRP_MSG_FIELD_INT64 = A(SINT64), /* alias for SINT64 */
+ MRP_MSG_FIELD_DOUBLE = 0x0b, /* double-prec. floating point */
+ MRP_MSG_FIELD_BLOB = 0x0c, /* a blob (not allowed in arrays) */
+ MRP_MSG_FIELD_ARRAY = 0x80, /* bit-mask to mark arrays */
+} mrp_msg_field_type_t;
+#undef A
+
+
+/** Tag to terminate a */
+#define MRP_MSG_INVALID_TAG MRP_MSG_FIELD_INVALID
+
+
/*
- * message field - tagged data
+ * a message field
*/
+#define MRP_MSG_VALUE_UNION union { \
+ char *str; \
+ bool bln; \
+ uint8_t u8; \
+ int8_t s8; \
+ uint16_t u16; \
+ int16_t s16; \
+ uint32_t u32; \
+ int32_t s32; \
+ uint64_t u64; \
+ int64_t s64; \
+ double dbl; \
+ void *blb; \
+ char **astr; \
+ bool *abln; \
+ uint8_t *au8; \
+ int8_t *as8; \
+ uint16_t *au16; \
+ int16_t *as16; \
+ uint32_t *au32; \
+ int32_t *as32; \
+ uint64_t *au64; \
+ int64_t *as64; \
+ }
+
+typedef MRP_MSG_VALUE_UNION mrp_msg_value_t;
+
typedef struct {
- char *tag; /* tag name */
- void *data; /* tag data */
- size_t size; /* amount of data */
- mrp_list_hook_t hook; /* to more fields */
+ mrp_list_hook_t hook; /* to message */
+ uint16_t tag; /* message field tag */
+ uint16_t type; /* message field type */
+ MRP_MSG_VALUE_UNION; /* message field value */
+ uint32_t size[0]; /* size, if an array or a blob */
} mrp_msg_field_t;
/*
- * message - a set of message fields
+ * a message
*/
typedef struct {
mrp_list_hook_t fields; /* list of message fields */
- size_t nfield; /* unencoded size of tags + data */
- int refcnt; /* reference count */
+ size_t nfield; /* number of fields */
+ mrp_refcnt_t refcnt; /* reference count */
} mrp_msg_t;
+/*
+ * a message buffer to help encoding / decoding
+ */
+
+typedef struct {
+ void *buf; /* message buffer */
+ size_t size; /* allocated size */
+ void *p; /* fill pointer */
+ size_t l; /* space left in buffer */
+} mrp_msgbuf_t;
+
/** Create a new message. */
-mrp_msg_t *mrp_msg_create(const char *tag, ...);
+mrp_msg_t *mrp_msg_create(uint16_t tag, ...);
/** Add a new reference to a message (ie. increase refcount). */
mrp_msg_t *mrp_msg_ref(mrp_msg_t *msg);
void mrp_msg_unref(mrp_msg_t *msg);
/** Append a field to a message. */
-int mrp_msg_append(mrp_msg_t *msg, char *tag, void *data, size_t size);
+int mrp_msg_append(mrp_msg_t *msg, uint16_t tag, ...);
/** Prepend a field to a message. */
-int mrp_msg_prepend(mrp_msg_t *msg, char *tag, void *data, size_t size);
+int mrp_msg_prepend(mrp_msg_t *msg, uint16_t tag, ...);
/** Find a field in a message. */
-void *mrp_msg_find(mrp_msg_t *msg, char *tag, size_t *size);
+mrp_msg_field_t *mrp_msg_find(mrp_msg_t *msg, uint16_t tag);
/** Dump a message. */
int mrp_msg_dump(mrp_msg_t *msg, FILE *fp);
/** Default message decoding. */
mrp_msg_t *mrp_msg_default_decode(void *buf, size_t size);
+/** Initialize the given message buffer for writing. */
+void *mrp_msgbuf_write(mrp_msgbuf_t *mb, size_t size);
+
+/** Initialize the given message buffer for reading. */
+void mrp_msgbuf_read(mrp_msgbuf_t *mb, void *buf, size_t size);
+
+/** Deinitialize the given message buffer, usually due to some error. */
+void mrp_msgbuf_cancel(mrp_msgbuf_t *mb);
+
+/** Reallocate the buffer if needed to accomodate size bytes of data. */
+void *mrp_msgbuf_ensure(mrp_msgbuf_t *mb, size_t size);
+
+/** Reserve the given amount of space from the buffer. */
+void *mrp_msgbuf_reserve(mrp_msgbuf_t *mb, size_t size, size_t align);
+
+/** Pull the given amount of data from the buffer. */
+void *mrp_msgbuf_pull(mrp_msgbuf_t *mb, size_t size, size_t align);
+
+
+#define MRP_MSGBUF_PUSH(mb, data, align, errlbl) do { \
+ size_t _size = sizeof(data); \
+ typeof(data) *_ptr; \
+ \
+ _ptr = mrp_msgbuf_reserve((mb), _size, (align)); \
+ \
+ if (_ptr != NULL) \
+ *_ptr = data; \
+ else \
+ goto errlbl; \
+ } while (0)
+
+#define MRP_MSGBUF_PUSH_DATA(mb, data, size, align, errlbl) do { \
+ size_t _size = (size); \
+ void *_ptr; \
+ \
+ _ptr = mrp_msgbuf_reserve((mb), _size, (align)); \
+ \
+ if (_ptr != NULL) \
+ memcpy(_ptr, data, _size); \
+ else \
+ goto errlbl; \
+ } while (0)
+
+#define MRP_MSGBUF_PULL(mb, type, align, errlbl) ({ \
+ size_t _size = sizeof(type); \
+ type *_ptr; \
+ \
+ _ptr = mrp_msgbuf_pull((mb), _size, (align)); \
+ \
+ if (_ptr == NULL) \
+ goto errlbl; \
+ \
+ *_ptr; \
+ })
+
+
+#define MRP_MSGBUF_PULL_DATA(mb, size, align, errlbl) ({ \
+ size_t _size = size; \
+ void *_ptr; \
+ \
+ _ptr = mrp_msgbuf_pull((mb), _size, (align)); \
+ \
+ if (_ptr == NULL) \
+ goto errlbl; \
+ \
+ _ptr; \
+ })
+
#endif /* __MURPHY_MSG_H__ */
#include <murphy/common.h>
+#define TAG_END ((uint16_t)0x0)
+#define TAG_SEQ ((uint16_t)0x1)
+#define TAG_FOO ((uint16_t)0x2)
+#define TAG_BAR ((uint16_t)0x3)
+#define TAG_MSG ((uint16_t)0x4)
+#define TAG_RPL ((uint16_t)0x5)
+
typedef struct {
mrp_mainloop_t *ml;
void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
{
- context_t *c = (context_t *)user_data;
-
- MRP_UNUSED(t);
- MRP_UNUSED(c);
+ context_t *c = (context_t *)user_data;
+ mrp_msg_field_t *f;
+ uint32_t seq;
+ char buf[256];
- mrp_log_info("Received a message.");
+ mrp_log_info("received a message");
mrp_msg_dump(msg, stdout);
if (c->server) {
+ seq = 0;
+ if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+ if (f->type == MRP_MSG_FIELD_UINT32)
+ seq = f->u32;
+ }
+
+ snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+
+ if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+ TAG_END)) {
+ mrp_log_info("failed to append to received message");
+ exit(1);
+ }
+
if (mrp_transport_send(t, msg))
- mrp_log_info("Reply successfully sent.");
+ mrp_log_info("reply successfully sent");
else
- mrp_log_error("Failed to send reply.");
+ mrp_log_error("failed to send reply");
+
+ /* message unreffed by transport layer */
}
}
void recvfrom_evt(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
socklen_t addrlen, void *user_data)
{
- context_t *c = (context_t *)user_data;
-
- MRP_UNUSED(t);
- MRP_UNUSED(c);
+ context_t *c = (context_t *)user_data;
+ mrp_msg_field_t *f;
+ uint32_t seq;
+ char buf[256];
- mrp_log_info("Received a message.");
+ mrp_log_info("received a message");
mrp_msg_dump(msg, stdout);
if (c->server) {
- mrp_msg_append(msg, "type", "reply", strlen("reply")+1);
+ seq = 0;
+ if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+ if (f->type == MRP_MSG_FIELD_UINT32)
+ seq = f->u32;
+ }
+
+ snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+
+ if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+ TAG_END)) {
+ mrp_log_info("failed to append to received message");
+ exit(1);
+ }
+
if (mrp_transport_sendto(t, msg, addr, addrlen))
- mrp_log_info("Reply successfully sent(to).");
+ mrp_log_info("reply successfully sent");
else
- mrp_log_error("Failed to send(to) reply.");
+ mrp_log_error("failed to send reply");
+
+ /* message unreffed by transport layer */
}
}
void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
{
- static int seqno = 1;
-
+ static uint32_t seqno = 1;
+
context_t *c = (context_t *)user_data;
mrp_msg_t *msg;
- char seq[32];
- int len;
+ uint32_t seq;
+ char buf[256];
+ uint32_t len;
MRP_UNUSED(ml);
MRP_UNUSED(t);
- if ((msg = mrp_msg_create(NULL)) == NULL) {
- mrp_log_error("Failed to create new message.");
- exit(1);
- }
-
- len = snprintf(seq, sizeof(seq), "%d", seqno);
- if (!mrp_msg_append(msg, "seq", seq, len + 1) ||
- !mrp_msg_append(msg, "foo", "bar", sizeof("bar")) ||
- !mrp_msg_append(msg, "bar", "foo", sizeof("foo")) ||
- !mrp_msg_append(msg, "foobar", "barfoo", sizeof("barfoo")) ||
- !mrp_msg_append(msg, "barfoo", "foobar", sizeof("foobar"))) {
- mrp_log_error("Failed to construct message #%d.", seqno);
+ seq = seqno++;
+ len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
+ if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
+ TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
+ TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
+ TAG_MSG, MRP_MSG_FIELD_BLOB , len, buf,
+ TAG_END)) == NULL) {
+ mrp_log_error("Failed to create new message.");
exit(1);
}
if (!mrp_transport_send(c->t, msg)) {
- mrp_log_error("Failed to send message #%d.", seqno);
+ mrp_log_error("Failed to send message #%d.", seq);
exit(1);
}
- else {
- mrp_log_info("Message #%d succesfully sent.", seqno++);
- }
+ else
+ mrp_log_info("Message #%d succesfully sent.", seq);
+
+ mrp_msg_unref(msg);
}
#include <murphy/common.h>
+#include <murphy/common/msg.h>
+#include <murphy/common/msg.c>
+
+#define TYPE(type, name) [MRP_MSG_FIELD_##type] = name
+const char *types[] = {
+ TYPE(INVALID, "invalid"),
+ TYPE(STRING , "string" ),
+ TYPE(BOOL , "bool" ),
+ TYPE(SINT8 , "sint8" ),
+ TYPE(UINT8 , "uint8" ),
+ TYPE(SINT16 , "sint16" ),
+ TYPE(UINT16 , "uint16" ),
+ TYPE(SINT32 , "sint32" ),
+ TYPE(UINT32 , "uint32" ),
+ TYPE(SINT64 , "sint64" ),
+ TYPE(UINT64 , "uint64" ),
+ TYPE(DOUBLE , "double" ),
+ TYPE(BLOB , "blob" ),
+ NULL,
+};
+#undef TYPE
+
+
+uint16_t get_type(const char **types, const char *name)
+{
+ const char **t;
+
+ for (t = types; *t != NULL; t++) {
+ if (!strcmp(*t, name))
+ return (uint16_t)(t - types);
+ }
+
+ return MRP_MSG_FIELD_INVALID;
+}
+
+
int main(int argc, char *argv[])
{
mrp_msg_t *msg, *decoded;
- char buf[1024], *tag;
- void *data, *encoded;
+ void *encoded;
ssize_t size;
- int i, len;
- char default_argv1[] = \
- "output=one_cb(): #0: 'one'\none_cb(): #0: 'one'\n";
- char *default_argv[] = { default_argv1, default_argv1 };
- int default_argc = 2;
-
+ uint16_t tag, type, prev_tag;
+ uint8_t u8;
+ int8_t s8;
+ uint16_t u16;
+ int16_t s16;
+ uint32_t u32;
+ int32_t s32;
+ uint64_t u64;
+ int64_t s64;
+ double dbl;
+ bool bln;
+ char *val, *end;
+ int i, ok;
- if (argc < 2) {
- argc = default_argc;
- argv = default_argv;
+ mrp_log_set_mask(MRP_LOG_UPTO(MRP_LOG_DEBUG));
+ mrp_log_set_target(MRP_LOG_TO_STDOUT);
+
+ if ((msg = mrp_msg_create(MRP_MSG_FIELD_INVALID)) == NULL) {
+ mrp_log_error("Failed to create new message.");
+ exit(1);
}
+
+ prev_tag = 0;
+ i = 1;
+ while (i < argc) {
+
+ if ('0' <= *argv[i] && *argv[i] <= '9') {
+ if (argc <= i + 2) {
+ mrp_log_error("Missing field type or value.");
+ exit(1);
+ }
- if ((msg = mrp_msg_create(NULL)) != NULL) {
- for (i = 1; i < argc; i++) {
- tag = argv[i];
- data = strchr(tag, '=');
-
- if (data != NULL) {
- len = (ptrdiff_t )(data - (void *)tag);
- if (len > (int)sizeof(buf) - 1)
- len = sizeof(buf) - 1;
- strncpy(buf, tag, len);
- buf[len] = '\0';
- tag = buf;
- data++;
- size = strlen((char *)data) + 1;
+ tag = prev_tag = (uint16_t)strtoul(argv[i++], &end, 0);
+ if (end && *end) {
+ mrp_log_error("Invalid field tag '%s'.", argv[i]);
+ exit(1);
+ }
+ }
+ else {
+ if (argc <= i + 1) {
+ mrp_log_error("Missing field type or value.");
+ exit(1);
}
+
+ tag = ++prev_tag;
+ }
+
+ type = get_type(types, argv[i++]);
+ val = argv[i++];
+
+ if (type == MRP_MSG_FIELD_INVALID) {
+ mrp_log_error("Invalid field type '%s'.", argv[i + 1]);
+ exit(1);
+ }
+
+ switch (type) {
+ case MRP_MSG_FIELD_STRING:
+ ok = mrp_msg_append(msg, tag, type, val);
+ break;
+
+ case MRP_MSG_FIELD_BOOL:
+ if (!strcasecmp(val, "true"))
+ bln = TRUE;
+ else if (!strcasecmp(val, "false"))
+ bln = FALSE;
else {
- data = NULL;
- size = 0;
+ mrp_log_error("Invalid boolean value '%s'.", val);
+ exit(1);
}
+ ok = mrp_msg_append(msg, tag, type, bln);
+ break;
- if (!mrp_msg_append(msg, tag, data, size)) {
- mrp_log_error("Failed to add field %s='%s' to message.",
- tag, (char *)data);
+#define HANDLE_INT(_bits, _uget, _sget) \
+ case MRP_MSG_FIELD_UINT##_bits: \
+ u##_bits = (uint##_bits##_t)strtoul(val, &end, 0); \
+ if (end && *end) { \
+ mrp_log_error("Invalid uint%d value '%s'.", _bits, val); \
+ exit(1); \
+ } \
+ ok = mrp_msg_append(msg, tag, type, u##_bits); \
+ break; \
+ case MRP_MSG_FIELD_SINT##_bits: \
+ s##_bits = (int##_bits##_t)strtol(val, &end, 0); \
+ if (end && *end) { \
+ mrp_log_error("Invalid sint%d value '%s'.", _bits, val); \
+ exit(1); \
+ } \
+ ok = mrp_msg_append(msg, tag, type, s##_bits); \
+ break
+
+ HANDLE_INT(8 , strtol , strtoul);
+ HANDLE_INT(16, strtol , strtoul);
+ HANDLE_INT(32, strtol , strtoul);
+ HANDLE_INT(64, strtoll, strtoull);
+
+ case MRP_MSG_FIELD_DOUBLE:
+ dbl = strtod(val, &end);
+ if (end && *end) {
+ mrp_log_error("Invalid double value '%s'.", val);
exit(1);
}
+ ok = mrp_msg_append(msg, tag, type, dbl);
+ break;
+
+ default:
+ mrp_log_error("Invalid (or unimplemented) type 0x%x (%s).",
+ type, argv[i + 1]);
+ ok = FALSE;
+ }
+
+ if (!ok) {
+ mrp_log_error("Failed to add field to message.");
+ exit(1);
}
}
mrp_msg_dump(msg, stdout);
size = mrp_msg_default_encode(msg, &encoded);
-
- if (size < 0) {
- mrp_log_error("Failed to encode message.");
+ if (size <= 0) {
+ mrp_log_error("Failed to encode message with default encoder.");
exit(1);
}
-
- decoded = mrp_msg_default_decode(encoded, size);
+ mrp_log_info("encoded message size: %d", (int)size);
+
+ decoded = mrp_msg_default_decode(encoded, size);
if (decoded == NULL) {
- mrp_log_error("Failed to decode message.");
+ mrp_log_error("Failed to decode message with default decoder.");
exit(1);
}
mrp_msg_dump(decoded, stdout);
+ mrp_msg_unref(msg);
+ mrp_msg_unref(decoded);
+
return 0;
}
#include <murphy/common.h>
+#define TAG_END ((uint16_t)0x0)
+#define TAG_SEQ ((uint16_t)0x1)
+#define TAG_FOO ((uint16_t)0x2)
+#define TAG_BAR ((uint16_t)0x3)
+#define TAG_MSG ((uint16_t)0x4)
+#define TAG_RPL ((uint16_t)0x5)
+
typedef struct {
mrp_mainloop_t *ml;
void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
{
- context_t *c = (context_t *)user_data;
-
- MRP_UNUSED(t);
- MRP_UNUSED(c);
+ context_t *c = (context_t *)user_data;
+ mrp_msg_field_t *f;
+ uint32_t seq;
+ char buf[256];
- mrp_log_info("Received a message.");
+ mrp_log_info("received a message");
mrp_msg_dump(msg, stdout);
if (c->server) {
-
-#define REPLY_KEY "this_is_a_rather_long_reply_field_name_that_I_hope_will_cause_reallocation_of_the_message_receiving_buffer_on_the_server_side_and_we_will_see_if_it_can_automatically_readjust_its_buffers"
-#define REPLY_VAL "and_this_is_the_rather_long_value_of_the_rather_long_field_name_that_we_hope_might_break_something_if_the_allocation_algorithm_has_horrible_easy_to_exploit_holes"
-
- mrp_msg_append(msg, REPLY_KEY, REPLY_VAL, strlen(REPLY_VAL) + 1);
-
+ seq = 0;
+ if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+ if (f->type == MRP_MSG_FIELD_UINT32)
+ seq = f->u32;
+ }
+
+ snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+
+ if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+ TAG_END)) {
+ mrp_log_info("failed to append to received message");
+ exit(1);
+ }
+
if (mrp_transport_send(t, msg))
- mrp_log_info("Reply successfully sent.");
+ mrp_log_info("reply successfully sent");
else
- mrp_log_error("Failed to send reply.");
+ mrp_log_error("failed to send reply");
+
+ /* message unreffed by transport layer */
}
}
void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
{
- static int seqno = 1;
-
+ static uint32_t seqno = 1;
+
context_t *c = (context_t *)user_data;
mrp_msg_t *msg;
- char seq[32];
- int len;
+ uint32_t seq;
+ char buf[256];
+ uint32_t len;
MRP_UNUSED(ml);
MRP_UNUSED(t);
- if ((msg = mrp_msg_create(NULL)) == NULL) {
- mrp_log_error("Failed to create new message.");
- exit(1);
- }
-
- len = snprintf(seq, sizeof(seq), "%d", seqno);
-#define LONG_KEY "aaaaaaaaaaaallllllllllllloooooooooooonnnnnnnnnnngggggggggffffffffffffiiiiiiiiiiiiieeeeeeeeeeeelllllllllllllddddddddddddnnnnnnnnnnnnnnnaaaaaaaaaaaaaaaammmmmmmmmmmmmmmeeeeeeeeeeeeeeeeeeeeee"
-#define LONG_VAL "aaaaaaaaaaallllllllllllllllloooooooooooonnnnnnnnngggggggggggvvvvvvvvvvvvaaaaaaaaaaaaaalllllllllluuuuuuuuuuuuuueeeeee"
-
- if (!mrp_msg_append(msg, "seq", seq, len + 1) ||
- !mrp_msg_append(msg, "foo", "bar", sizeof("bar")) ||
- !mrp_msg_append(msg, "bar", "foo", sizeof("foo")) ||
- !mrp_msg_append(msg, "foobar", "barfoo", sizeof("barfoo")) ||
- !mrp_msg_append(msg, "barfoo", "foobar", sizeof("foobar")) ||
- !mrp_msg_append(msg, LONG_KEY, LONG_VAL, strlen(LONG_VAL) + 1)) {
- mrp_log_error("Failed to construct message #%d.", seqno);
+ seq = seqno++;
+ len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
+ if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
+ TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
+ TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
+ TAG_MSG, MRP_MSG_FIELD_BLOB , len, buf,
+ TAG_END)) == NULL) {
+ mrp_log_error("Failed to create new message.");
exit(1);
}
if (!mrp_transport_send(c->t, msg)) {
- mrp_log_error("Failed to send message #%d.", seqno);
+ mrp_log_error("Failed to send message #%d.", seq);
exit(1);
}
- else {
- mrp_log_info("Message #%d succesfully sent.", seqno++);
- }
+ else
+ mrp_log_info("Message #%d succesfully sent.", seq);
+
+ mrp_msg_unref(msg);
}
#include <murphy/common.h>
+#define TAG_END ((uint16_t)0x0)
+#define TAG_SEQ ((uint16_t)0x1)
+#define TAG_FOO ((uint16_t)0x2)
+#define TAG_BAR ((uint16_t)0x3)
+#define TAG_MSG ((uint16_t)0x4)
+#define TAG_RPL ((uint16_t)0x5)
+
typedef struct {
mrp_mainloop_t *ml;
void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
{
- context_t *c = (context_t *)user_data;
-
- MRP_UNUSED(t);
- MRP_UNUSED(c);
+ context_t *c = (context_t *)user_data;
+ mrp_msg_field_t *f;
+ uint32_t seq;
+ char buf[256];
- mrp_log_info("Received a message.");
+ mrp_log_info("received a message");
mrp_msg_dump(msg, stdout);
if (c->server) {
-
-#define REPLY_KEY "this_is_a_rather_long_reply_field_name_that_I_hope_will_cause_reallocation_of_the_message_receiving_buffer_on_the_server_side_and_we_will_see_if_it_can_automatically_readjust_its_buffers"
-#define REPLY_VAL "and_this_is_the_rather_long_value_of_the_rather_long_field_name_that_we_hope_might_break_something_if_the_allocation_algorithm_has_horrible_easy_to_exploit_holes"
-
- mrp_msg_append(msg, REPLY_KEY, REPLY_VAL, strlen(REPLY_VAL) + 1);
-
+ seq = 0;
+ if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+ if (f->type == MRP_MSG_FIELD_UINT32)
+ seq = f->u32;
+ }
+
+ snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+
+ if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+ TAG_END)) {
+ mrp_log_info("failed to append to received message");
+ exit(1);
+ }
+
if (mrp_transport_send(t, msg))
- mrp_log_info("Reply successfully sent.");
+ mrp_log_info("reply successfully sent");
else
- mrp_log_error("Failed to send reply.");
+ mrp_log_error("failed to send reply");
+
+ /* message unreffed by transport layer */
}
}
void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
{
- static int seqno = 1;
-
+ static uint32_t seqno = 1;
+
context_t *c = (context_t *)user_data;
mrp_msg_t *msg;
- char seq[32];
- int len;
+ uint32_t seq;
+ char buf[256];
+ uint32_t len;
MRP_UNUSED(ml);
MRP_UNUSED(t);
- if ((msg = mrp_msg_create(NULL)) == NULL) {
- mrp_log_error("Failed to create new message.");
- exit(1);
- }
-
- len = snprintf(seq, sizeof(seq), "%d", seqno);
-#define LONG_KEY "aaaaaaaaaaaallllllllllllloooooooooooonnnnnnnnnnngggggggggffffffffffffiiiiiiiiiiiiieeeeeeeeeeeelllllllllllllddddddddddddnnnnnnnnnnnnnnnaaaaaaaaaaaaaaaammmmmmmmmmmmmmmeeeeeeeeeeeeeeeeeeeeee"
-#define LONG_VAL "aaaaaaaaaaallllllllllllllllloooooooooooonnnnnnnnngggggggggggvvvvvvvvvvvvaaaaaaaaaaaaaalllllllllluuuuuuuuuuuuuueeeeee"
-
- if (!mrp_msg_append(msg, "seq", seq, len + 1) ||
- !mrp_msg_append(msg, "foo", "bar", sizeof("bar")) ||
- !mrp_msg_append(msg, "bar", "foo", sizeof("foo")) ||
- !mrp_msg_append(msg, "foobar", "barfoo", sizeof("barfoo")) ||
- !mrp_msg_append(msg, "barfoo", "foobar", sizeof("foobar")) ||
- !mrp_msg_append(msg, LONG_KEY, LONG_VAL, strlen(LONG_VAL) + 1)) {
- mrp_log_error("Failed to construct message #%d.", seqno);
+ seq = seqno++;
+ len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
+ if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
+ TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
+ TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
+ TAG_MSG, MRP_MSG_FIELD_BLOB , len, buf,
+ TAG_END)) == NULL) {
+ mrp_log_error("Failed to create new message.");
exit(1);
}
if (!mrp_transport_send(c->t, msg)) {
- mrp_log_error("Failed to send message #%d.", seqno);
+ mrp_log_error("Failed to send message #%d.", seq);
exit(1);
}
- else {
- mrp_log_info("Message #%d succesfully sent.", seqno++);
- }
+ else
+ mrp_log_info("Message #%d succesfully sent.", seq);
+
+ mrp_msg_unref(msg);
}
#include <murphy/common.h>
+#define TAG_END ((uint16_t)0x0)
+#define TAG_SEQ ((uint16_t)0x1)
+#define TAG_FOO ((uint16_t)0x2)
+#define TAG_BAR ((uint16_t)0x3)
+#define TAG_MSG ((uint16_t)0x4)
+#define TAG_RPL ((uint16_t)0x5)
+
typedef struct {
mrp_mainloop_t *ml;
void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
{
- context_t *c = (context_t *)user_data;
-
- MRP_UNUSED(t);
- MRP_UNUSED(c);
+ context_t *c = (context_t *)user_data;
+ mrp_msg_field_t *f;
+ uint32_t seq;
+ char buf[256];
- mrp_log_info("Received a message.");
+ mrp_log_info("received a message");
mrp_msg_dump(msg, stdout);
if (c->server) {
+ seq = 0;
+ if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+ if (f->type == MRP_MSG_FIELD_UINT32)
+ seq = f->u32;
+ }
+
+ snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+
+ if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+ TAG_END)) {
+ mrp_log_info("failed to append to received message");
+ exit(1);
+ }
+
if (mrp_transport_send(t, msg))
- mrp_log_info("Reply successfully sent.");
+ mrp_log_info("reply successfully sent");
else
- mrp_log_error("Failed to send reply.");
- }
+ mrp_log_error("failed to send reply");
-#if 0 /* done by the tranport layer... */
- mrp_msg_destroy(msg);
-#endif
+ /* message unreffed by transport layer */
+ }
}
void recvfrom_evt(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
socklen_t addrlen, void *user_data)
{
- context_t *c = (context_t *)user_data;
-
- MRP_UNUSED(t);
- MRP_UNUSED(c);
+ context_t *c = (context_t *)user_data;
+ mrp_msg_field_t *f;
+ uint32_t seq;
+ char buf[256];
- mrp_log_info("Received a message.");
+ mrp_log_info("received a message");
mrp_msg_dump(msg, stdout);
if (c->server) {
- mrp_msg_append(msg, "type", "reply", strlen("reply")+1);
+ seq = 0;
+ if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+ if (f->type == MRP_MSG_FIELD_UINT32)
+ seq = f->u32;
+ }
+
+ snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+
+ if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+ TAG_END)) {
+ mrp_log_info("failed to append to received message");
+ exit(1);
+ }
+
if (mrp_transport_sendto(t, msg, addr, addrlen))
- mrp_log_info("Reply successfully sent.");
+ mrp_log_info("reply successfully sent");
else
- mrp_log_error("Failed to send reply.");
- }
+ mrp_log_error("failed to send reply");
-#if 0 /* done by the tranport layer... */
- mrp_msg_destroy(msg);
-#endif
+ /* message unreffed by transport layer */
+ }
}
mrp_sockaddr_t addr;
socklen_t addrlen;
- c->t = mrp_transport_create(c->ml, "udp", &evt, c, 0);
+ c->t = mrp_transport_create(c->ml, "udp4", &evt, c, 0);
if (c->t == NULL) {
mrp_log_error("Failed to create new transport.");
void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
{
- static int seqno = 1;
-
+ static uint32_t seqno = 1;
+
context_t *c = (context_t *)user_data;
mrp_msg_t *msg;
- char seq[32];
- int len;
+ uint32_t seq;
+ char buf[256];
+ uint32_t len;
MRP_UNUSED(ml);
MRP_UNUSED(t);
- if ((msg = mrp_msg_create(NULL)) == NULL) {
- mrp_log_error("Failed to create new message.");
- exit(1);
- }
-
- len = snprintf(seq, sizeof(seq), "%d", seqno);
- if (!mrp_msg_append(msg, "seq", seq, len + 1) ||
- !mrp_msg_append(msg, "foo", "bar", sizeof("bar")) ||
- !mrp_msg_append(msg, "bar", "foo", sizeof("foo")) ||
- !mrp_msg_append(msg, "foobar", "barfoo", sizeof("barfoo")) ||
- !mrp_msg_append(msg, "barfoo", "foobar", sizeof("foobar"))) {
- mrp_log_error("Failed to construct message #%d.", seqno);
+ seq = seqno++;
+ len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
+ if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
+ TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
+ TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
+ TAG_MSG, MRP_MSG_FIELD_BLOB , len, buf,
+ TAG_END)) == NULL) {
+ mrp_log_error("Failed to create new message.");
exit(1);
}
if (!mrp_transport_send(c->t, msg)) {
- mrp_log_error("Failed to send message #%d.", seqno);
+ mrp_log_error("Failed to send message #%d.", seq);
exit(1);
}
- else {
- mrp_log_info("Message #%d succesfully sent.", seqno++);
- }
+ else
+ mrp_log_info("Message #%d succesfully sent.", seq);
+
+ mrp_msg_unref(msg);
}
exit(1);
}
- c->t = mrp_transport_create(c->ml, type, &evt, c, 0);
+ c->t = mrp_transport_create(c->ml, "udp4", &evt, c, 0);
if (c->t == NULL) {
mrp_log_error("Failed to create new transport.");
mrp_sockaddr_t *addr, socklen_t size,
const char **type)
{
-#if 1
mrp_transport_descr_t *d;
mrp_list_hook_t *p, *n;
socklen_t l;
}
return 0;
-#else
- mrp_transport_descr_t *d;
- char *p, type[32];
- int n;
-
- if ((p = strchr(str, ':')) != NULL && (n = p - str) < (int)sizeof(type)) {
- strncpy(type, str, n);
- type[n] = '\0';
-
- if (t != NULL)
- return t->descr->resolve(p + 1, addr, size);
- else {
- if ((d = find_transport(type)) != NULL)
- return d->resolve(p + 1, addr, size);
- }
- }
-
- return 0;
-#endif
}
#include <readline/history.h>
#include <murphy/common.h>
+#include <murphy/plugins/console-protocol.h>
#define client_info mrp_log_info
#define client_warn mrp_log_warning
void prompt_process_input(client_t *c, char *input)
{
mrp_msg_t *msg;
- int len;
-
+ uint16_t tag, type;
+ uint32_t len;
+
len = input ? strlen(input) + 1: 0;
if (len > 1) {
add_history(input);
prompt_erase(c);
- if ((msg = mrp_msg_create("input", input, len, NULL)) != NULL) {
+ tag = MRP_CONSOLE_INPUT;
+ type = MRP_MSG_FIELD_BLOB;
+ msg = mrp_msg_create(tag, type, len, input, MRP_MSG_FIELD_INVALID);
+
+ if (msg != NULL) {
mrp_transport_send(c->t, msg);
mrp_msg_unref(msg);
- prompt_display(c);
}
- else
- client_error("failed to send request to server.");
+
+ prompt_display(c);
+ return;
}
+
+ client_error("failed to send request to server.");
}
void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
{
- client_t *c = (client_t *)user_data;
- char *prompt, *output, buf[128], *dummy;
- size_t size;
+ client_t *c = (client_t *)user_data;
+ mrp_msg_field_t *f;
+ char *prompt, *output, buf[128];
+ size_t size;
MRP_UNUSED(t);
prompt_erase(c);
- if ((output = mrp_msg_find(msg, "output", &size)) != NULL) {
+ if ((f = mrp_msg_find(msg, MRP_CONSOLE_OUTPUT)) != NULL) {
+ output = f->str;
+ size = f->size[0];
printf("%.*s", (int)size, output);
}
- else if ((prompt = mrp_msg_find(msg, "prompt", &size)) != NULL) {
- snprintf(buf, sizeof(buf), "%.*s> ", (int)size, prompt);
+ else if ((f = mrp_msg_find(msg, MRP_CONSOLE_PROMPT)) != NULL) {
+ prompt = f->str;
+ snprintf(buf, sizeof(buf), "%s> ", prompt);
prompt_set(c, buf);
}
- else if ((dummy = mrp_msg_find(msg, "bye", &size)) != NULL) {
+ else if ((f = mrp_msg_find(msg, MRP_CONSOLE_BYE)) != NULL) {
mrp_mainloop_quit(c->ml, 0);
return;
}
--- /dev/null
+#ifndef __MURPHY_CONSOLE_PROTOCOL_H__
+#define __MURPHY_CONSOLE_PROTOCOL_H__
+
+#define MRP_CONSOLE_INPUT 0x1
+#define MRP_CONSOLE_OUTPUT 0x2
+#define MRP_CONSOLE_PROMPT 0x3
+#define MRP_CONSOLE_BYE 0x4
+
+
+#endif /* __MURPHY_CONSOLE_PROTOCOL_H__ */
#include <murphy/common.h>
#include <murphy/core.h>
+#include <murphy/plugins/console-protocol.h>
#define console_info(fmt, args...) mrp_log_info("console: "fmt , ## args)
#define console_warn(fmt, args...) mrp_log_warning("console: "fmt , ## args)
{
console_t *c = (console_t *)mc->backend_data;
mrp_msg_t *msg;
+ uint16_t tag, type;
+ uint32_t len;
- if ((msg = mrp_msg_create("output", buf, size, NULL)) != NULL) {
+ tag = MRP_CONSOLE_OUTPUT;
+ type = MRP_MSG_FIELD_BLOB;
+ len = size;
+ msg = mrp_msg_create(tag, type, len, buf, MRP_MSG_FIELD_INVALID);
+
+ if (msg != NULL) {
mrp_transport_send(c->t, msg);
mrp_msg_unref(msg);
static void tcp_close_req(mrp_console_t *mc)
{
console_t *c = (console_t *)mc->backend_data;
-
+
mrp_transport_disconnect(c->t);
mrp_transport_destroy(c->t);
c->t = NULL;
{
console_t *c = (console_t *)mc->backend_data;
mrp_msg_t *msg;
- int dummy = TRUE;
+ uint16_t tag, type;
- if ((msg = mrp_msg_create("bye", &dummy, sizeof(dummy), NULL)) != NULL)
+ tag = MRP_CONSOLE_BYE;
+ type = MRP_MSG_FIELD_BOOL;
+ msg = mrp_msg_create(tag, type, TRUE, MRP_MSG_FIELD_INVALID);
+
+ if (msg != NULL) {
mrp_transport_send(c->t, msg);
+ mrp_msg_unref(msg);
+ }
mrp_transport_disconnect(c->t);
}
{
console_t *c = (console_t *)mc->backend_data;
mrp_msg_t *msg;
+ uint16_t tag, type;
+
+ tag = MRP_CONSOLE_PROMPT;
+ type = MRP_MSG_FIELD_STRING;
+ msg = mrp_msg_create(tag, type, prompt, MRP_MSG_FIELD_INVALID);
- msg = mrp_msg_create("prompt", prompt, strlen(prompt) + 1, NULL);
-
if (msg != NULL) {
mrp_transport_send(c->t, msg);
mrp_msg_unref(msg);
static void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
{
- console_t *c = (console_t *)user_data;
- char *input;
- size_t size;
+ console_t *c = (console_t *)user_data;
+ mrp_msg_field_t *f;
+ char *input;
+ size_t size;
MRP_UNUSED(t);
- input = mrp_msg_find(msg, "input", &size);
-
- if (input != NULL) {
- MRP_CONSOLE_BUSY(c->mc, {
- c->mc->evt.input(c->mc, input, size);
- });
+ if ((f = mrp_msg_find(msg, MRP_CONSOLE_INPUT)) != NULL) {
+ if (f->type == MRP_MSG_FIELD_BLOB) {
+ input = f->str;
+ size = f->size[0];
+ MRP_CONSOLE_BUSY(c->mc, {
+ c->mc->evt.input(c->mc, input, size);
+ });
- c->mc->check_destroy(c->mc);
+ c->mc->check_destroy(c->mc);
+ return;
+ }
}
- else
- mrp_log_error("Received malformed console message.");
+
+ mrp_log_error("Received malformed console message.");
}
mrp_sockaddr_t *addr, socklen_t addrlen,
void *user_data)
{
- console_t *c = (console_t *)user_data;
- char *input;
- size_t size;
+ console_t *c = (console_t *)user_data;
+ mrp_msg_field_t *f;
+ char *input;
+ size_t size;
MRP_UNUSED(t);
mrp_debug("got new message...");
- input = mrp_msg_find(msg, "input", &size);
-
- if (input != NULL) {
- mrp_sockaddr_t a;
- socklen_t l;
+ if ((f = mrp_msg_find(msg, MRP_CONSOLE_INPUT)) != NULL) {
+ if (f->type == MRP_MSG_FIELD_STRING) {
+ input = f->str;
+ size = f->size[0];
+
+ if (input != NULL) {
+ mrp_sockaddr_t a;
+ socklen_t l;
- mrp_sockaddr_cpy(&a, &c->addr, l=c->addrlen);
- mrp_sockaddr_cpy(&c->addr, addr, c->addrlen=addrlen);
+ mrp_sockaddr_cpy(&a, &c->addr, l=c->addrlen);
+ mrp_sockaddr_cpy(&c->addr, addr, c->addrlen=addrlen);
- mrp_transport_connect(t, addr, addrlen);
- MRP_CONSOLE_BUSY(c->mc, {
- c->mc->evt.input(c->mc, input, size);
- });
+ mrp_transport_connect(t, addr, addrlen);
+ MRP_CONSOLE_BUSY(c->mc, {
+ c->mc->evt.input(c->mc, input, size);
+ });
- c->mc->check_destroy(c->mc);
+ c->mc->check_destroy(c->mc);
- mrp_transport_disconnect(t);
+ mrp_transport_disconnect(t);
+
+ mrp_sockaddr_cpy(&c->addr, &a, c->addrlen=l);
- mrp_sockaddr_cpy(&c->addr, &a, c->addrlen=l);
- if (l)
- mrp_transport_connect(t, &a, l);
+ if (l)
+ mrp_transport_connect(t, &a, l);
+
+ return;
+ }
+ }
}
- else
- mrp_log_error("Received malformed console message.");
+
+ mrp_log_error("Received malformed console message.");
}