common: slightly reworked default messaging abstraction
authorKrisztian Litkey <krisztian.litkey@intel.com>
Fri, 11 May 2012 11:33:02 +0000 (14:33 +0300)
committerKrisztian Litkey <krisztian.litkey@intel.com>
Tue, 15 May 2012 12:07:28 +0000 (15:07 +0300)
Hmm... I'm not happy with this at all. It's better than the original
quick-hack proto but not by a large enough margin. It probably needs
quite a bit of work still eventually. One thing that needs to be added
is a way to let users use the transport layer without forcing the default
messaging abstraction/encoding/decoding on them. This should preferably
happen by allowing users to send (almost) arbitrary data structures over
a transport by providing a descriptor that describes the layout of the
data structure in terms of offset/MRP_MSG_FIELD_* allowing the transport
or messaging layer to do the encoding/decoding of the data. The sending
part is relatively easy. The trickier part is the receiving side and to
come up with an API that is simple and intuitive enough to be usable yet
allows one to flexibly multiplex any number of structure types over the
same transport connection...

12 files changed:
src/common/dgram-transport.c
src/common/msg.c
src/common/msg.h
src/common/tests/dgram-test.c
src/common/tests/msg-test.c
src/common/tests/stream-test.c
src/common/tests/tcp-test.c
src/common/tests/udp-test.c
src/common/transport.c
src/console-client/client.c
src/plugins/console-protocol.h [new file with mode: 0644]
src/plugins/plugin-console.c

index 87225ff..f3b7641 100644 (file)
@@ -13,7 +13,7 @@
 #include <murphy/common/msg.h>
 #include <murphy/common/transport.h>
 
-#define DEFAULT_SIZE 1024                   /* default input buffer size */
+#define DEFAULT_SIZE 1024                /* default input buffer size */
 
 typedef struct {
     MRP_TRANSPORT_PUBLIC_FIELDS;         /* common transport fields */
index bf48319..a0d956c 100644 (file)
@@ -1,6 +1,7 @@
 #include <string.h>
 #include <errno.h>
 #include <stdarg.h>
+#include <ctype.h>
 #include <arpa/inet.h>
 
 #include <murphy/common/macros.h>
 #include <murphy/common/list.h>
 #include <murphy/common/msg.h>
 
-#define AVG_SPACE_PER_FIELD 32           /* guesstimate for tag + data */
-#define len_t               uint32_t
-#define MSG_ALIGN           sizeof(len_t)
 
-static inline int msg_add(mrp_msg_t *msg, const char *tag, void *data,
-                         size_t size, int prepend);
-static void msg_destroy(mrp_msg_t *msg);
-
-mrp_msg_t *mrp_msg_create(const char *tag, ...)
+static inline mrp_msg_field_t *create_field(uint16_t tag, va_list *ap)
 {
-    mrp_msg_t *msg;
-    va_list    ap;
-    void      *data;
-    size_t     size;
-    
-    va_start(ap, tag);
-    if ((msg = mrp_allocz(sizeof(*msg))) != NULL) {
-       mrp_list_init(&msg->fields);
+    mrp_msg_field_t *f;
+    uint16_t         type;
+    uint32_t         size;
+    void            *blb;
+
+    if ((f = mrp_allocz(sizeof(*f))) != NULL) {
+       mrp_list_init(&f->hook);
+       type = va_arg(*ap, uint32_t);
        
-       while (tag != NULL) {
-           data = va_arg(ap, typeof(data));
-           size = va_arg(ap, typeof(size));
+#define CREATE(_f, _tag, _type, _fldtype, _fld, _last, _errlbl) do {   \
+           (_f) = mrp_allocz(MRP_OFFSET(typeof(*_f), _last) +          \
+                             sizeof(_f->_last));                       \
+                                                                       \
+           if ((_f) != NULL) {                                         \
+               (_f)->tag  = _tag;                                      \
+               (_f)->type = _type;                                     \
+               (_f)->_fld = va_arg(*ap, _fldtype);                     \
+           }                                                           \
+           else {                                                      \
+               goto _errlbl;                                           \
+           }                                                           \
+       } while (0)
+
+       switch (type) {
+       case MRP_MSG_FIELD_STRING:
+           CREATE(f, tag, type, char *, str, str, nomem);
+           f->str = mrp_strdup(f->str);
+           if (f->str == NULL)
+               goto nomem;
+           break;
+       case MRP_MSG_FIELD_BOOL:
+           CREATE(f, tag, type, int, bln, bln, nomem);
+           break;
+       case MRP_MSG_FIELD_UINT8:
+           CREATE(f, tag, type, unsigned int, u8, u8, nomem);
+           break;
+       case MRP_MSG_FIELD_SINT8:
+           CREATE(f, tag, type, signed int, s8, s8, nomem);
+           break;
+       case MRP_MSG_FIELD_UINT16:
+           CREATE(f, tag, type, unsigned int, u16, u16, nomem);
+           break;
+       case MRP_MSG_FIELD_SINT16:
+           CREATE(f, tag, type, signed int, s16, s16, nomem);
+           break;
+       case MRP_MSG_FIELD_UINT32:
+           CREATE(f, tag, type, unsigned int, u32, u32, nomem);
+           break;
+       case MRP_MSG_FIELD_SINT32:
+           CREATE(f, tag, type, signed int, s32, s32, nomem);
+           break;
+       case MRP_MSG_FIELD_UINT64:
+           CREATE(f, tag, type, uint64_t, u64, u64, nomem);
+           break;
+       case MRP_MSG_FIELD_SINT64:
+           CREATE(f, tag, type, int64_t, s64, s64, nomem);
+           break;
+       case MRP_MSG_FIELD_DOUBLE:
+           CREATE(f, tag, type, double, dbl, dbl, nomem);
+           break;
+
+       case MRP_MSG_FIELD_BLOB:
+           size = va_arg(ap, uint32_t);
+           CREATE(f, tag, type, void *, blb, size[0], nomem);
+
+           blb        = f->blb;
+           f->size[0] = size;
+           f->blb     = mrp_allocz(size);
+
+           if (f->blb != NULL) {
+               memcpy(f->blb, blb, size);
+               f->size[0] = size;
+           }
+           else
+               goto nomem;
+           break;
            
-           if (!msg_add(msg, tag, data, size, FALSE)) {
-               msg_destroy(msg);
-               msg = NULL;
-
-               goto out;
+       default:
+           if (f->type & MRP_MSG_FIELD_ARRAY) {
+               errno = EOPNOTSUPP;
+               mrp_log_error("XXX TODO: MRP_MSG_FIELD_ARRAY not implemented");
            }
-
-           tag = va_arg(ap, typeof(tag));
+           else
+               errno = EINVAL;
+           
+           mrp_free(f);
+           f = NULL;
        }
+#undef CREATE
     }
-
-    msg->refcnt = 1;
-
- out:
-    va_end(ap);
     
-    return msg;
+    return f;
+
+ nomem:
+    errno = ENOMEM;
+    return NULL;
 }
 
 
@@ -58,27 +118,55 @@ static void msg_destroy(mrp_msg_t *msg)
     mrp_msg_field_t *f;
 
     if (msg != NULL) {
-#ifdef __MSG_EXTRA_CHECKS__
-       if (MRP_UNLIKELY(msg->refcnt) != 0) {
-           mrp_log_error("%s() called for message (%p) with refcnt %d...",
-                         __FUNCTION__, msg, msg->refcnt);
-       }
-#endif
-
-       mrp_debug("destroying message %p...", msg);
-
        mrp_list_foreach(&msg->fields, p, n) {
            f = mrp_list_entry(p, typeof(*f), hook);
-           mrp_list_delete(p);
+           mrp_list_delete(&f->hook);
+           
+           switch (f->type) {
+           case MRP_MSG_FIELD_STRING:
+               mrp_free(f->str);
+               break;
+           case MRP_MSG_FIELD_BLOB:
+               mrp_free(f->blb);
+               break;
+           }
 
-           mrp_free(f->tag);
-           mrp_free(f->data);
            mrp_free(f);
        }
+    }
+}
+
+
+mrp_msg_t *mrp_msg_create(uint16_t tag, ...)
+{
+    mrp_msg_t       *msg;
+    mrp_msg_field_t *f;
+    va_list          ap;
+    
+    va_start(ap, tag);
+    if ((msg = mrp_allocz(sizeof(*msg))) != NULL) {
+       mrp_list_init(&msg->fields);
+       msg->refcnt = 1;
        
-       mrp_free(msg);
+       while (tag != MRP_MSG_FIELD_INVALID) {
+           f = create_field(tag, &ap);
+           
+           if (f != NULL) {
+               mrp_list_append(&msg->fields, &f->hook);
+               msg->nfield++;
+           }
+           else {
+               msg_destroy(msg);
+               msg = NULL;
+               goto out;
+           }
+           tag = va_arg(ap, uint32_t);
+       }
     }
+ out:
+    va_end(ap);
     
+    return msg;
 }
 
 
@@ -95,85 +183,134 @@ void mrp_msg_unref(mrp_msg_t *msg)
 {
     if (msg != NULL) {
        msg->refcnt--;
-
+       
        if (msg->refcnt <= 0)
            msg_destroy(msg);
     }
 }
 
 
-static inline int msg_add(mrp_msg_t *msg, const char *tag, void *data,
-                         size_t size, int prepend)
+int mrp_msg_append(mrp_msg_t *msg, uint16_t tag, ...)
 {
     mrp_msg_field_t *f;
+    va_list          ap;
 
-    if ((f = mrp_allocz(sizeof(*f))) != NULL) {
-       f->tag  = mrp_strdup(tag);
-       f->data = mrp_datadup(data, size);
-       f->size = size;
+    va_start(ap, tag);
+    f = create_field(tag, &ap);
+    va_end(ap);
 
-       if (f->tag != NULL && f->data != NULL) {
-           mrp_list_init(&f->hook);
-           if (!prepend)
-               mrp_list_append(&msg->fields, &f->hook);
-           else
-               mrp_list_prepend(&msg->fields, &f->hook);
-           msg->nfield++;
-           
-           return TRUE;
-       }
-       else {
-           mrp_free(f->tag);
-           mrp_free(f->data);
-           mrp_free(f);
-       }
+    if (f != NULL) {
+       mrp_list_append(&msg->fields, &f->hook);
+       msg->nfield++;
+       return TRUE;
     }
-
-    return FALSE;
+    else
+       return FALSE;
 }
 
 
-int mrp_msg_append(mrp_msg_t *msg, char *tag, void *data, size_t size)
+int mrp_msg_prepend(mrp_msg_t *msg, uint16_t tag, ...)
 {
-    return msg_add(msg, tag, data, size, FALSE);
-}
+    mrp_msg_field_t *f;
+    va_list          ap;
 
+    va_start(ap, tag);
+    f = create_field(tag, &ap);
+    va_end(ap);
 
-int mrp_msg_prepend(mrp_msg_t *msg, char *tag, void *data, size_t size)
-{
-    return msg_add(msg, tag, data, size, TRUE);
+    if (f != NULL) {
+       mrp_list_prepend(&msg->fields, &f->hook);
+       msg->nfield++;
+       return TRUE;
+    }
+    else
+       return FALSE;    
 }
 
 
-void *mrp_msg_find(mrp_msg_t *msg, char *tag, size_t *size)
+mrp_msg_field_t *mrp_msg_find(mrp_msg_t *msg, uint16_t tag)
 {
     mrp_msg_field_t *f;
     mrp_list_hook_t *p, *n;
 
     mrp_list_foreach(&msg->fields, p, n) {
        f = mrp_list_entry(p, typeof(*f), hook);
-
-       if (!strcmp(f->tag, tag)) {
-           *size = f->size;
-           return f->data;
-       }
+       if (f->tag == tag)
+           return f;
     }
 
-    *size = 0;
     return NULL;
 }
 
+
 int mrp_msg_dump(mrp_msg_t *msg, FILE *fp)
 {
     mrp_msg_field_t *f;
     mrp_list_hook_t *p, *n;
     int              l;
-
+    
     l = fprintf(fp, "{\n");
     mrp_list_foreach(&msg->fields, p, n) {
-       f  = mrp_list_entry(p, typeof(*f), hook);
-       l += fprintf(fp, "    %s='%.*s' (%zd bytes)\n",
-                    f->tag, (int)f->size, (char *)f->data, f->size);
+       f = mrp_list_entry(p, typeof(*f), hook);
+
+       l += fprintf(fp, "    0x%x ", f->tag);
+
+#define DUMP(_fmt, _type, _val)                                        \
+       l += fprintf(fp, "= <%s> "_fmt"\n", _type, _val)
+       
+       switch (f->type) {
+       case MRP_MSG_FIELD_STRING:
+           DUMP("'%s'", "string", f->str);
+           break;
+       case MRP_MSG_FIELD_BOOL:
+           DUMP("%s", "boolean", f->bln ? "true" : "false");
+           break;
+       case MRP_MSG_FIELD_UINT8:
+           DUMP("%u", "uint8", f->u8);
+           break;
+       case MRP_MSG_FIELD_SINT8:
+           DUMP("%d", "sint8", f->s8);
+           break;
+       case MRP_MSG_FIELD_UINT16:
+           DUMP("%u", "uint16", f->u16);
+           break;
+       case MRP_MSG_FIELD_SINT16:
+           DUMP("%d", "sint16", f->s16);
+           break;
+       case MRP_MSG_FIELD_UINT32:
+           DUMP("%u", "uint32", f->u32);
+           break;
+       case MRP_MSG_FIELD_SINT32:
+           DUMP("%d", "sint32", f->s32);
+           break;
+       case MRP_MSG_FIELD_UINT64:
+           DUMP("%Lu", "uint64", (long long unsigned)f->u64);
+           break;
+       case MRP_MSG_FIELD_SINT64:
+           DUMP("%Ld", "sint64", (long long signed)f->s64);
+           break;
+       case MRP_MSG_FIELD_DOUBLE:
+           DUMP("%f", "double", f->dbl);
+           break;
+       case MRP_MSG_FIELD_BLOB: {
+           char     *p;
+           uint32_t  i;
+           
+           fprintf(fp, "= <%s> <%u bytes, ", "blob", f->size[0]);
+           
+           for (i = 0, p = f->blb; i < f->size[0]; i++, p++) {
+               if (isprint(*p) && *p != '\n' && *p != '\t' && *p != '\r')
+                   fprintf(fp, "%c", *p);
+               else
+                   fprintf(fp, ".");
+           }
+           fprintf(fp, ">\n");
+       }
+           break;
+           
+       default:
+           fprintf(fp, "= <%s> {%u items, XXX TODO}\n", "array", f->size[0]);
+       }
     }
     l += fprintf(fp, "}\n");
 
@@ -181,158 +318,386 @@ int mrp_msg_dump(mrp_msg_t *msg, FILE *fp)
 }
 
 
+#define MSG_MIN_CHUNK 32
+
 ssize_t mrp_msg_default_encode(mrp_msg_t *msg, void **bufp)
 {
-#define ENSURE_SPACE(needed) do {                              \
-       int _miss = needed - l;                                 \
-       if (MRP_UNLIKELY(_miss > 0)) {                          \
-           p -= (ptrdiff_t)buf;                                \
-           size += _miss * 2;                                  \
-           if (mrp_realloc(buf, size) == NULL) {               \
-               mrp_free(buf);                                  \
-               *bufp = NULL;                                   \
-               return -1;                                      \
-           }                                                   \
-           else                                                \
-               p    += (ptrdiff_t)buf;                         \
-       }                                                       \
-    } while (0)
+#define RESERVE(type) ({                                               \
+           void *_ptr;                                                 \
+                                                                       \
+           _ptr = mrp_msgbuf_reserve(&mb, sizeof(type), 1);            \
+                                                                       \
+           if (_ptr == NULL) {                                         \
+               *bufp = NULL;                                           \
+               return -1;                                              \
+           }                                                           \
+                                                                       \
+           _ptr;                                                       \
+       })
+
+#define RESERVE_SIZE(size) ({                          \
+           void *_ptr;                                 \
+                                                       \
+           _ptr = mrp_msgbuf_reserve(&mb, size, 1);    \
+                                                       \
+           if (_ptr == NULL) {                         \
+               *bufp = NULL;                           \
+               return -1;                              \
+           }                                           \
+                                                       \
+           _ptr;                                       \
+       })
+    
 
     mrp_msg_field_t *f;
-    mrp_list_hook_t *pf, *nf;
-    void            *buf, *p;
-    len_t            tsize, *sizep;
-    size_t           size, nfield, pad, extra, fsize, tpad, dpad;
-    int              l;
+    mrp_list_hook_t *p, *n;
+    mrp_msgbuf_t     mb;
+    uint32_t         len;
+    size_t           size;
+    
+    size = msg->nfield * (2 * sizeof(uint16_t) + sizeof(uint64_t));
+    
+    if (mrp_msgbuf_write(&mb, size)) {
+       MRP_MSGBUF_PUSH(&mb, htobe16(msg->nfield), 1, nomem);
 
-    nfield = msg->nfield;
-    extra  = nfield * 2 * sizeof(uint32_t);
-    pad    = nfield * 2 * 3;
-    size   = sizeof(len_t) + nfield * AVG_SPACE_PER_FIELD + extra + pad;
+       mrp_list_foreach(&msg->fields, p, n) {
+           f = mrp_list_entry(p, typeof(*f), hook);
+           
+           MRP_MSGBUF_PUSH(&mb, htobe16(f->tag) , 1, nomem);
+           MRP_MSGBUF_PUSH(&mb, htobe16(f->type), 1, nomem);
+
+           switch (f->type) {
+           case MRP_MSG_FIELD_STRING:
+               len = strlen(f->str) + 1;
+               MRP_MSGBUF_PUSH(&mb, htobe32(len), 1, nomem);
+               MRP_MSGBUF_PUSH_DATA(&mb, f->str, len, 1, nomem);
+               break;
+               
+           case MRP_MSG_FIELD_BOOL:
+               MRP_MSGBUF_PUSH(&mb, htobe32(f->bln ? TRUE : FALSE), 1, nomem);
+               break;
 
-    if ((buf = mrp_alloc(size)) != NULL) {
-       p = buf;
-       l = size;
-       
-       /* encode number of fields */
-       sizep = p;
-       *sizep = htonl(msg->nfield);
-       p     += sizeof(*sizep);
-       l     -= sizeof(*sizep);
-       
-       mrp_list_foreach(&msg->fields, pf, nf) {
-           f = mrp_list_entry(pf, typeof(*f), hook);
+           case MRP_MSG_FIELD_UINT8:
+               MRP_MSGBUF_PUSH(&mb, f->u8, 1, nomem);
+               break;
 
-           /* make space for field if needed */
-           tsize  = strlen(f->tag) + 1;
-           tpad   = (MSG_ALIGN - (tsize   & (MSG_ALIGN-1))) & (MSG_ALIGN-1);
-           dpad   = (MSG_ALIGN - (f->size & (MSG_ALIGN-1))) & (MSG_ALIGN-1);
-           
-           fsize  = sizeof(*sizep) + tsize   + tpad;
-           fsize += sizeof(*sizep) + f->size + dpad;
-           ENSURE_SPACE(fsize);
-
-           /* tag size and tag */
-           sizep  = p;
-           *sizep = htonl(tsize + tpad);
-           p     += sizeof(*sizep);
-           l     -= sizeof(*sizep);
-           memcpy(p, f->tag, tsize);
-           memset(p + tsize, 0, tpad);
-           p     += tsize + tpad;
-           l     -= tsize + tpad;
-
-           /* data size and data */
-           sizep  = p;
-           *sizep = htonl(f->size + dpad);
-           p     += sizeof(*sizep);
-           l     -= sizeof(*sizep);
-           memcpy(p, f->data, f->size);
-           memset(p + f->size, 0, dpad);
-           p     += f->size + dpad;
-           l     -= f->size + dpad;
+           case MRP_MSG_FIELD_SINT8:
+               MRP_MSGBUF_PUSH(&mb, f->s8, 1, nomem);
+               break;
+
+           case MRP_MSG_FIELD_UINT16:
+               MRP_MSGBUF_PUSH(&mb, htobe16(f->u16), 1, nomem);
+               break;
+
+           case MRP_MSG_FIELD_SINT16:
+               MRP_MSGBUF_PUSH(&mb, htobe16(f->s16), 1, nomem);
+               break;
+
+           case MRP_MSG_FIELD_UINT32:
+               MRP_MSGBUF_PUSH(&mb, htobe32(f->u32), 1, nomem);
+               break;
+
+           case MRP_MSG_FIELD_SINT32:
+               MRP_MSGBUF_PUSH(&mb, htobe32(f->s32), 1, nomem);
+               break;
+
+           case MRP_MSG_FIELD_UINT64:
+               MRP_MSGBUF_PUSH(&mb, htobe64(f->u64), 1, nomem);
+               break;
+
+           case MRP_MSG_FIELD_SINT64:
+               MRP_MSGBUF_PUSH(&mb, htobe64(f->s64), 1, nomem);
+               break;
+
+           case MRP_MSG_FIELD_DOUBLE:
+               MRP_MSGBUF_PUSH(&mb, f->dbl, 1, nomem);
+               break;
+               
+           case MRP_MSG_FIELD_BLOB:
+               len   = f->size[0];
+               MRP_MSGBUF_PUSH(&mb, htobe32(len), 1, nomem);
+               MRP_MSGBUF_PUSH_DATA(&mb, f->blb, len, 1, nomem);
+               break;
+
+           default:
+               if (f->type & MRP_MSG_FIELD_ARRAY) {
+                   errno = EOPNOTSUPP;
+                   mrp_log_error("XXX TODO: MRP_MSG_FIELD_ARRAY "
+                                 "not implemented");
+               }
+               else
+                   errno = EINVAL;     
+
+               mrp_msgbuf_cancel(&mb);
+           nomem:
+               *bufp = NULL;
+               return -1;
+           }
        }
+    }
     
-       size = p - buf;
-       *bufp = buf;
+    *bufp = mb.buf;
+    return mb.p - mb.buf;
+}
+
+
+mrp_msg_t *mrp_msg_default_decode(void *buf, size_t size)
+{
+#define PULL(type) ({                                          \
+           void *_ptr;                                         \
+                                                               \
+           _ptr = mrp_msgbuf_pull(&mb, sizeof(type), 1);       \
+                                                               \
+           if (_ptr == NULL)                                   \
+               return NULL;                                    \
+                                                               \
+           _ptr;                                               \
+       })
+
+#define PULL_SIZE(size) ({                                     \
+           void *_ptr;                                         \
+                                                               \
+           _ptr = mrp_msgbuf_pull(&mb, size, 1);               \
+                                                               \
+           if (_ptr == NULL)                                   \
+               return NULL;                                    \
+                                                               \
+           _ptr;                                               \
+       })
+
+    mrp_msg_t       *msg;
+    mrp_msgbuf_t     mb;
+    mrp_msg_value_t  v;
+    void            *value;
+    uint16_t         nfield, tag, type;
+    uint32_t         len;
+    int              i;
+
+    msg = mrp_msg_create(MRP_MSG_FIELD_INVALID);
+
+    if (msg == NULL)
+       return NULL;
+    
+    mrp_msgbuf_read(&mb, buf, size);
+    
+    nfield = be16toh(MRP_MSGBUF_PULL(&mb, typeof(nfield), 1, nodata));
+    
+    for (i = 0; i < nfield; i++) {
+       tag  = be16toh(MRP_MSGBUF_PULL(&mb, typeof(tag) , 1, nodata));
+       type = be16toh(MRP_MSGBUF_PULL(&mb, typeof(type), 1, nodata));
+
+       switch (type) {
+       case MRP_MSG_FIELD_STRING:
+           len   = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len), 1, nodata));
+           value = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
+           if (!mrp_msg_append(msg, tag, type, value, MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+           
+       case MRP_MSG_FIELD_BOOL:
+           v.bln = be32toh(MRP_MSGBUF_PULL(&mb, uint32_t, 1, nodata));
+           if (!mrp_msg_append(msg, tag, type, v.bln, MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+           
+       case MRP_MSG_FIELD_UINT8:
+           v.u8 = MRP_MSGBUF_PULL(&mb, typeof(v.u8), 1, nodata);
+           if (!mrp_msg_append(msg, tag, type, v.u8, MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+
+       case MRP_MSG_FIELD_SINT8:
+           v.s8 = MRP_MSGBUF_PULL(&mb, typeof(v.s8), 1, nodata);
+           if (!mrp_msg_append(msg, tag, type, v.s8, MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+
+       case MRP_MSG_FIELD_UINT16:
+           v.u16 = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v.u16), 1, nodata));
+           if (!mrp_msg_append(msg, tag, type, v.u16, MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+           
+       case MRP_MSG_FIELD_SINT16:
+           v.s16 = be16toh(MRP_MSGBUF_PULL(&mb, typeof(v.s16), 1, nodata));
+           if (!mrp_msg_append(msg, tag, type, v.s16, MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+
+       case MRP_MSG_FIELD_UINT32:
+           v.u32 = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v.u32), 1, nodata));
+           if (!mrp_msg_append(msg, tag, type, v.u32, MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+
+       case MRP_MSG_FIELD_SINT32:
+           v.s32 = be32toh(MRP_MSGBUF_PULL(&mb, typeof(v.s32), 1, nodata));
+           if (!mrp_msg_append(msg, tag, type, v.s32, MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+
+       case MRP_MSG_FIELD_UINT64:
+           v.u64 = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v.u64), 1, nodata));
+           if (!mrp_msg_append(msg, tag, type, v.u64, MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+
+       case MRP_MSG_FIELD_SINT64:
+           v.s64 = be64toh(MRP_MSGBUF_PULL(&mb, typeof(v.s64), 1, nodata));
+           if (!mrp_msg_append(msg, tag, type, v.s64, MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+           
+       case MRP_MSG_FIELD_DOUBLE:
+           v.dbl = MRP_MSGBUF_PULL(&mb, typeof(v.dbl), 1, nodata);
+           if (!mrp_msg_append(msg, tag, type, v.dbl, MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+           
+       case MRP_MSG_FIELD_BLOB:
+           len   = be32toh(MRP_MSGBUF_PULL(&mb, typeof(len), 1, nodata));
+           value = MRP_MSGBUF_PULL_DATA(&mb, len, 1, nodata);
+           if (!mrp_msg_append(msg, tag, type, len, value,
+                               MRP_MSG_FIELD_INVALID))
+               goto fail;
+           break;
+           
+       default:
+           if (type & MRP_MSG_FIELD_ARRAY) {
+               errno = EOPNOTSUPP;
+               mrp_log_error("XXX TODO: MRP_MSG_FIELD_ARRAY "
+                             "not implemented");
+           }
+           else
+               errno = EINVAL; 
+           goto fail;
+       }
     }
-    else {
-       *bufp = NULL;
-       size = -1;
+    
+    return msg;
+
+    
+ fail:
+ nodata:
+    mrp_msg_unref(msg);
+    return NULL;
+}
+
+
+void *mrp_msgbuf_write(mrp_msgbuf_t *mb, size_t size)
+{
+    mrp_clear(mb);
+
+    mb->buf = mrp_allocz(size);
+    
+    if (mb->buf != NULL) {
+       mb->size = size;
+       mb->p    = mb->buf;
+       mb->l    = size;
+
+       return mb->p;
     }
+    else
+       return NULL;
+}
 
-    return size;
 
-#undef ENSURE_SPACE
+void mrp_msgbuf_read(mrp_msgbuf_t *mb, void *buf, size_t size)
+{
+    mb->buf  = mb->p = buf;
+    mb->size = mb->l = size;
 }
 
 
-mrp_msg_t *mrp_msg_default_decode(void *buf, size_t size)
+void mrp_msgbuf_cancel(mrp_msgbuf_t *mb)
+{
+    mrp_free(mb->buf);
+    mb->buf = mb->p = NULL;
+}
+
+
+void *mrp_msgbuf_ensure(mrp_msgbuf_t *mb, size_t size)
 {
-#define ENSURE_DATA(n) do {                    \
-       if (MRP_UNLIKELY((int)(n) > (int)l)) {  \
-           msg_destroy(msg);                   \
-           return NULL;                        \
-       }                                       \
-    } while (0)
+    int diff;
     
-    mrp_msg_t *msg;
-    len_t     *sizep;
-    int        nfield, l, i;
-    char      *tag;
-    void      *p, *data;
-    size_t     n;
-
-    if ((msg = mrp_msg_create(NULL)) != NULL) {
-       p = buf;
-       l = size;
+    if (MRP_UNLIKELY(size > mb->l)) {
+       diff = size - mb->l;
        
-       /* get number of fields */
-       ENSURE_DATA(sizeof(*sizep));
-       sizep   = p;
-       nfield  = ntohl(*sizep);
-       p      += sizeof(*sizep);
-       l      -= sizeof(*sizep);
-
-       /* decode fields */
-       for (i = 0; i < nfield; i++) {
-           /* get tag size and tag */
-           ENSURE_DATA(sizeof(*sizep));
-           sizep  = p;
-           p     += sizeof(*sizep);
-           l     -= sizeof(*sizep);
-           n      = ntohl(*sizep);
-           ENSURE_DATA(MRP_ALIGN(n, MSG_ALIGN));
-           tag    = p;
-           /* get data size and data */
-           p     += MRP_ALIGN(n, MSG_ALIGN);
-           l     -= MRP_ALIGN(n, MSG_ALIGN);
-           ENSURE_DATA(sizeof(*sizep));
-           sizep  = p;
-           p     += sizeof(*sizep);
-           l     -= sizeof(*sizep);
-           n      = ntohl(*sizep);
-           ENSURE_DATA(MRP_ALIGN(n, MSG_ALIGN));
-           data   = p;
-
-           if (!mrp_msg_append(msg, tag, data, n)) {
-               msg_destroy(msg);
+       if (diff < MSG_MIN_CHUNK)
+           diff = MSG_MIN_CHUNK;
+       
+       mb->p -= (ptrdiff_t)mb->buf;
+       
+       if (mrp_realloc(mb->buf, mb->size + diff)) {
+           memset(mb->buf + mb->size, 0, diff);
+           mb->size += diff;
+           mb->p    += (ptrdiff_t)mb->buf;
+           mb->l    += diff;
+       }
+       else
+           mrp_msgbuf_cancel(mb);
+    }
 
-               return NULL;
-           }
-               
+    return mb->p;
+}
 
-           p += MRP_ALIGN(n, MSG_ALIGN);
-           l -= MRP_ALIGN(n, MSG_ALIGN);
-       }
 
-       if (MRP_UNLIKELY(l != 0)) {
-           msg_destroy(msg);
-           msg = NULL;
-       }
+void *mrp_msgbuf_reserve(mrp_msgbuf_t *mb, size_t size, size_t align)
+{
+    void      *reserved;
+    ptrdiff_t  offs, pad;
+    size_t     len;
+
+    len  = size;
+    offs = mb->p - mb->buf;
+       
+    if (offs % align != 0) {
+       pad  = align - (offs % align);
+       len += pad;
     }
+    else
+       pad = 0;
 
-    return msg;
+    if (mrp_msgbuf_ensure(mb, len)) {
+       if (pad != 0)
+           memset(mb->p, 0, pad);
+
+       reserved = mb->p + pad;
+       
+       mb->p += len;
+       mb->l -= len;
+    }
+    else
+       reserved = NULL;
+
+    return reserved;
+}
+
+
+void *mrp_msgbuf_pull(mrp_msgbuf_t *mb, size_t size, size_t align)
+{
+    void      *pulled;
+    ptrdiff_t  offs, pad;
+    size_t     len;
+
+    len  = size;
+    offs = mb->p - mb->buf;
+       
+    if (offs % align != 0) {
+       pad  = align - (offs % align);
+       len += pad;
+    }
+    else
+       pad = 0;
+
+    if (mb->l >= len) {
+       pulled = mb->p + pad;
+       
+       mb->p += len;
+       mb->l -= len;
+    }
+    else
+       pulled = NULL;
 
-#undef ENSURE_DATA
+    return pulled;
 }
index 5689c06..71efa88 100644 (file)
 #define __MURPHY_MSG_H__
 
 #include <stdio.h>
+#include <stdbool.h>
+#include <stdarg.h>
+
 #include <murphy/common/list.h>
+#include <murphy/common/refcnt.h>
+
+
+/*
+ * message field types
+ */
+
+#define A(t) MRP_MSG_FIELD_##t
+typedef enum {
+    MRP_MSG_FIELD_INVALID = 0x00,        /* defined invalid type */
+    MRP_MSG_FIELD_STRING  = 0x01,        /* string */
+    MRP_MSG_FIELD_BOOL    = 0x02,        /* boolean */ 
+    MRP_MSG_FIELD_UINT8   = 0x03,        /* unsigned 8-bit integer */
+    MRP_MSG_FIELD_SINT8   = 0x04,        /* signed 8-bit integer */
+    MRP_MSG_FIELD_INT8    = A(SINT8),    /* alias for SINT8 */
+    MRP_MSG_FIELD_UINT16  = 0x05,        /* unsigned 16-bit integer */
+    MRP_MSG_FIELD_SINT16  = 0x06,        /* signed 16-bit integer */
+    MRP_MSG_FIELD_INT16   = A(SINT16),   /* alias for SINT16 */
+    MRP_MSG_FIELD_UINT32  = 0x07,        /* unsigned 32-bit integer */
+    MRP_MSG_FIELD_SINT32  = 0x08,        /* signed 32-bit integer */
+    MRP_MSG_FIELD_INT32   = A(SINT32),   /* alias for SINT32 */
+    MRP_MSG_FIELD_UINT64  = 0x09,        /* unsigned 64-bit integer */
+    MRP_MSG_FIELD_SINT64  = 0x0a,        /* signed 64-bit integer */
+    MRP_MSG_FIELD_INT64   = A(SINT64),   /* alias for SINT64 */
+    MRP_MSG_FIELD_DOUBLE  = 0x0b,        /* double-prec. floating point */
+    MRP_MSG_FIELD_BLOB    = 0x0c,        /* a blob (not allowed in arrays) */
+    MRP_MSG_FIELD_ARRAY   = 0x80,        /* bit-mask to mark arrays */
+} mrp_msg_field_type_t;
+#undef A
+
+
+/** Tag to terminate a */
+#define MRP_MSG_INVALID_TAG MRP_MSG_FIELD_INVALID
+
+
 
 /*
- * message field - tagged data
+ * a message field
  */
 
+#define MRP_MSG_VALUE_UNION union {            \
+       char      *str;                         \
+       bool       bln;                         \
+       uint8_t    u8;                          \
+       int8_t     s8;                          \
+       uint16_t   u16;                         \
+       int16_t    s16;                         \
+       uint32_t   u32;                         \
+       int32_t    s32;                         \
+       uint64_t   u64;                         \
+       int64_t    s64;                         \
+       double     dbl;                         \
+       void      *blb;                         \
+       char     **astr;                        \
+       bool      *abln;                        \
+       uint8_t   *au8;                         \
+       int8_t    *as8;                         \
+       uint16_t  *au16;                        \
+       int16_t   *as16;                        \
+       uint32_t  *au32;                        \
+       int32_t   *as32;                        \
+       uint64_t  *au64;                        \
+       int64_t   *as64;                        \
+    }
+
+typedef MRP_MSG_VALUE_UNION mrp_msg_value_t;
+
 typedef struct {
-    char            *tag;                /* tag name */
-    void            *data;               /* tag data */
-    size_t           size;               /* amount of data */
-    mrp_list_hook_t  hook;               /* to more fields */
+    mrp_list_hook_t hook;                /* to message */
+    uint16_t        tag;                 /* message field tag */
+    uint16_t        type;                /* message field type */
+    MRP_MSG_VALUE_UNION;                 /* message field value */
+    uint32_t        size[0];             /* size, if an array or a blob */
 } mrp_msg_field_t;
 
 
 /*
- * message - a set of message fields
+ * a message
  */
 
 typedef struct {
     mrp_list_hook_t fields;              /* list of message fields */
-    size_t          nfield;              /* unencoded size of tags + data */
-    int             refcnt;              /* reference count */
+    size_t          nfield;              /* number of fields */
+    mrp_refcnt_t    refcnt;              /* reference count */
 } mrp_msg_t;
 
 
+/*
+ * a message buffer to help encoding / decoding
+ */
+
+typedef struct {
+    void   *buf;                         /* message buffer */
+    size_t  size;                        /* allocated size */
+    void   *p;                           /* fill pointer */
+    size_t  l;                           /* space left in buffer */
+} mrp_msgbuf_t;
+
 /** Create a new message. */
-mrp_msg_t *mrp_msg_create(const char *tag, ...);
+mrp_msg_t *mrp_msg_create(uint16_t tag, ...);
 
 /** Add a new reference to a message (ie. increase refcount). */
 mrp_msg_t *mrp_msg_ref(mrp_msg_t *msg);
@@ -37,13 +114,13 @@ mrp_msg_t *mrp_msg_ref(mrp_msg_t *msg);
 void mrp_msg_unref(mrp_msg_t *msg);
 
 /** Append a field to a message. */
-int mrp_msg_append(mrp_msg_t *msg, char *tag, void *data, size_t size);
+int mrp_msg_append(mrp_msg_t *msg, uint16_t tag, ...);
 
 /** Prepend a field to a message. */
-int mrp_msg_prepend(mrp_msg_t *msg, char *tag, void *data, size_t size);
+int mrp_msg_prepend(mrp_msg_t *msg, uint16_t tag, ...);
 
 /** Find a field in a message. */
-void *mrp_msg_find(mrp_msg_t *msg, char *tag, size_t *size);
+mrp_msg_field_t *mrp_msg_find(mrp_msg_t *msg, uint16_t tag);
 
 /** Dump a message. */
 int mrp_msg_dump(mrp_msg_t *msg, FILE *fp);
@@ -54,4 +131,72 @@ ssize_t mrp_msg_default_encode(mrp_msg_t *msg, void **bufp);
 /** Default message decoding. */
 mrp_msg_t *mrp_msg_default_decode(void *buf, size_t size);
 
+/** Initialize the given message buffer for writing. */
+void *mrp_msgbuf_write(mrp_msgbuf_t *mb, size_t size);
+
+/** Initialize the given message buffer for reading. */
+void mrp_msgbuf_read(mrp_msgbuf_t *mb, void *buf, size_t size);
+
+/** Deinitialize the given message buffer, usually due to some error. */
+void mrp_msgbuf_cancel(mrp_msgbuf_t *mb);
+
+/** Reallocate the buffer if needed to accomodate size bytes of data. */
+void *mrp_msgbuf_ensure(mrp_msgbuf_t *mb, size_t size);
+
+/** Reserve the given amount of space from the buffer. */
+void *mrp_msgbuf_reserve(mrp_msgbuf_t *mb, size_t size, size_t align);
+
+/** Pull the given amount of data from the buffer. */
+void *mrp_msgbuf_pull(mrp_msgbuf_t *mb, size_t size, size_t align);
+
+
+#define MRP_MSGBUF_PUSH(mb, data, align, errlbl) do {          \
+       size_t        _size = sizeof(data);                     \
+       typeof(data) *_ptr;                                     \
+                                                               \
+       _ptr  = mrp_msgbuf_reserve((mb), _size, (align));       \
+                                                               \
+       if (_ptr != NULL)                                       \
+           *_ptr = data;                                       \
+       else                                                    \
+           goto errlbl;                                        \
+    } while (0)
+
+#define MRP_MSGBUF_PUSH_DATA(mb, data, size, align, errlbl) do {       \
+       size_t _size = (size);                                          \
+       void   *_ptr;                                                   \
+                                                                       \
+       _ptr  = mrp_msgbuf_reserve((mb), _size, (align));               \
+                                                                       \
+       if (_ptr != NULL)                                               \
+           memcpy(_ptr, data, _size);                                  \
+       else                                                            \
+           goto errlbl;                                                \
+    } while (0)
+
+#define MRP_MSGBUF_PULL(mb, type, align, errlbl) ({                    \
+           size_t  _size = sizeof(type);                               \
+           type   *_ptr;                                               \
+                                                                       \
+           _ptr = mrp_msgbuf_pull((mb), _size, (align));               \
+                                                                       \
+           if (_ptr == NULL)                                           \
+               goto errlbl;                                            \
+                                                                       \
+           *_ptr;                                                      \
+       })
+
+
+#define MRP_MSGBUF_PULL_DATA(mb, size, align, errlbl) ({       \
+           size_t  _size = size;                               \
+           void   *_ptr;                                       \
+                                                               \
+           _ptr = mrp_msgbuf_pull((mb), _size, (align));       \
+                                                               \
+           if (_ptr == NULL)                                   \
+               goto errlbl;                                    \
+                                                               \
+           _ptr;                                               \
+       })
+
 #endif /* __MURPHY_MSG_H__ */
index c05dbaf..f14820d 100644 (file)
@@ -8,6 +8,13 @@
 
 #include <murphy/common.h>
 
+#define TAG_END     ((uint16_t)0x0)
+#define TAG_SEQ     ((uint16_t)0x1)
+#define TAG_FOO     ((uint16_t)0x2)
+#define TAG_BAR     ((uint16_t)0x3)
+#define TAG_MSG     ((uint16_t)0x4)
+#define TAG_RPL     ((uint16_t)0x5)
+
 
 typedef struct {
     mrp_mainloop_t  *ml;
@@ -23,19 +30,35 @@ typedef struct {
 
 void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
 {
-    context_t *c = (context_t *)user_data;
-
-    MRP_UNUSED(t);    
-    MRP_UNUSED(c);
+    context_t       *c = (context_t *)user_data;
+    mrp_msg_field_t *f;
+    uint32_t         seq;
+    char             buf[256];
     
-    mrp_log_info("Received a message.");
+    mrp_log_info("received a message");
     mrp_msg_dump(msg, stdout);
     
     if (c->server) {
+       seq = 0;
+       if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+           if (f->type == MRP_MSG_FIELD_UINT32)
+               seq = f->u32;
+       }
+           
+       snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+       
+       if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+                           TAG_END)) {
+           mrp_log_info("failed to append to received message");
+           exit(1);
+       }
+                          
        if (mrp_transport_send(t, msg))
-           mrp_log_info("Reply successfully sent.");
+           mrp_log_info("reply successfully sent");
        else
-           mrp_log_error("Failed to send reply.");
+           mrp_log_error("failed to send reply");
+
+       /* message unreffed by transport layer */
     }
 }
 
@@ -43,20 +66,35 @@ void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
 void recvfrom_evt(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
                  socklen_t addrlen, void *user_data)
 {
-    context_t *c = (context_t *)user_data;
-
-    MRP_UNUSED(t);    
-    MRP_UNUSED(c);
+    context_t       *c = (context_t *)user_data;
+    mrp_msg_field_t *f;
+    uint32_t         seq;
+    char             buf[256];
     
-    mrp_log_info("Received a message.");
+    mrp_log_info("received a message");
     mrp_msg_dump(msg, stdout);
     
     if (c->server) {
-       mrp_msg_append(msg, "type", "reply", strlen("reply")+1);
+       seq = 0;
+       if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+           if (f->type == MRP_MSG_FIELD_UINT32)
+               seq = f->u32;
+       }
+           
+       snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+       
+       if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+                           TAG_END)) {
+           mrp_log_info("failed to append to received message");
+           exit(1);
+       }
+                          
        if (mrp_transport_sendto(t, msg, addr, addrlen))
-           mrp_log_info("Reply successfully sent(to).");
+           mrp_log_info("reply successfully sent");
        else
-           mrp_log_error("Failed to send(to) reply.");
+           mrp_log_error("failed to send reply");
+
+       /* message unreffed by transport layer */
     }
 }
 
@@ -118,39 +156,37 @@ void server_init(context_t *c)
 
 void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
 {
-    static int seqno = 1;
-
+    static uint32_t seqno = 1;
+    
     context_t *c = (context_t *)user_data;
     mrp_msg_t *msg;
-    char       seq[32];
-    int        len;
+    uint32_t   seq;
+    char       buf[256];
+    uint32_t   len;
 
     MRP_UNUSED(ml);
     MRP_UNUSED(t);
 
-    if ((msg = mrp_msg_create(NULL)) == NULL) {
-       mrp_log_error("Failed to create new message.");
-       exit(1);
-    }
-
-    len = snprintf(seq, sizeof(seq), "%d", seqno);
     
-    if (!mrp_msg_append(msg, "seq", seq, len + 1) ||
-       !mrp_msg_append(msg, "foo", "bar", sizeof("bar")) ||
-       !mrp_msg_append(msg, "bar", "foo", sizeof("foo")) ||
-       !mrp_msg_append(msg, "foobar", "barfoo", sizeof("barfoo")) ||
-       !mrp_msg_append(msg, "barfoo", "foobar", sizeof("foobar"))) {
-       mrp_log_error("Failed to construct message #%d.", seqno);
+    seq = seqno++;
+    len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
+    if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
+                             TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
+                             TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
+                             TAG_MSG, MRP_MSG_FIELD_BLOB  , len, buf,
+                             TAG_END)) == NULL) {
+       mrp_log_error("Failed to create new message.");
        exit(1);
     }
 
     if (!mrp_transport_send(c->t, msg)) {
-       mrp_log_error("Failed to send message #%d.", seqno);
+       mrp_log_error("Failed to send message #%d.", seq);
        exit(1);
     }
-    else {
-       mrp_log_info("Message #%d succesfully sent.", seqno++);
-    }
+    else
+       mrp_log_info("Message #%d succesfully sent.", seq);
+    
+    mrp_msg_unref(msg);
 }
 
 
index e480e63..9dc2a42 100644 (file)
 #include <murphy/common.h>
 
+#include <murphy/common/msg.h>
+#include <murphy/common/msg.c>
+
+#define TYPE(type, name) [MRP_MSG_FIELD_##type] = name
+const char *types[] = {
+    TYPE(INVALID, "invalid"),
+    TYPE(STRING , "string" ),
+    TYPE(BOOL   , "bool"   ),
+    TYPE(SINT8  , "sint8"  ),
+    TYPE(UINT8  , "uint8"  ),
+    TYPE(SINT16 , "sint16" ),
+    TYPE(UINT16 , "uint16" ),
+    TYPE(SINT32 , "sint32" ),
+    TYPE(UINT32 , "uint32" ),
+    TYPE(SINT64 , "sint64" ),
+    TYPE(UINT64 , "uint64" ),
+    TYPE(DOUBLE , "double" ),
+    TYPE(BLOB   , "blob"   ),
+    NULL,
+};
+#undef TYPE
+
+
+uint16_t get_type(const char **types, const char *name)
+{
+    const char **t;
+    
+    for (t = types; *t != NULL; t++) {
+       if (!strcmp(*t, name))
+           return (uint16_t)(t - types);
+    }
+    
+    return MRP_MSG_FIELD_INVALID;
+}
+
+
 int main(int argc, char *argv[])
 {
     mrp_msg_t *msg, *decoded;
-    char       buf[1024], *tag;
-    void      *data, *encoded;
+    void      *encoded;
     ssize_t    size;
-    int        i, len;
-    char       default_argv1[] = \
-       "output=one_cb(): #0: 'one'\none_cb(): #0: 'one'\n";
-    char      *default_argv[] = { default_argv1, default_argv1 };
-    int        default_argc = 2;
-    
+    uint16_t   tag, type, prev_tag;
+    uint8_t    u8;
+    int8_t     s8;
+    uint16_t   u16;
+    int16_t    s16;
+    uint32_t   u32;
+    int32_t    s32;
+    uint64_t   u64;
+    int64_t    s64;
+    double     dbl;
+    bool       bln;
+    char      *val, *end;
+    int        i, ok;
 
-    if (argc < 2) {
-       argc = default_argc;
-       argv = default_argv;
+    mrp_log_set_mask(MRP_LOG_UPTO(MRP_LOG_DEBUG));
+    mrp_log_set_target(MRP_LOG_TO_STDOUT);
+        
+    if ((msg = mrp_msg_create(MRP_MSG_FIELD_INVALID)) == NULL) {
+       mrp_log_error("Failed to create new message.");
+       exit(1);
     }
+    
+    prev_tag = 0;
+    i        = 1;
+    while (i < argc) {
+       
+       if ('0' <= *argv[i] && *argv[i] <= '9') {
+           if (argc <= i + 2) {
+               mrp_log_error("Missing field type or value.");
+               exit(1);
+           }
 
-    if ((msg = mrp_msg_create(NULL)) != NULL) {
-       for (i = 1; i < argc; i++) {
-           tag  = argv[i];
-           data = strchr(tag, '=');
-           
-           if (data != NULL) {
-               len = (ptrdiff_t )(data - (void *)tag);
-               if (len > (int)sizeof(buf) - 1)
-                   len = sizeof(buf) - 1;
-               strncpy(buf, tag, len);
-               buf[len] = '\0';
-               tag = buf;
-               data++;
-               size = strlen((char *)data) + 1;
+           tag = prev_tag = (uint16_t)strtoul(argv[i++], &end, 0);
+           if (end && *end) {
+               mrp_log_error("Invalid field tag '%s'.", argv[i]);
+               exit(1);
+           }
+       }
+       else {
+           if (argc <= i + 1) {
+               mrp_log_error("Missing field type or value.");
+               exit(1);
            }
+
+           tag = ++prev_tag;
+       }
+
+       type = get_type(types, argv[i++]);
+       val  = argv[i++];
+       
+       if (type == MRP_MSG_FIELD_INVALID) {
+           mrp_log_error("Invalid field type '%s'.", argv[i + 1]);
+           exit(1);
+       }
+
+       switch (type) {
+       case MRP_MSG_FIELD_STRING:
+           ok = mrp_msg_append(msg, tag, type, val);
+           break;
+
+       case MRP_MSG_FIELD_BOOL:
+           if (!strcasecmp(val, "true"))
+               bln = TRUE;
+           else if (!strcasecmp(val, "false"))
+               bln = FALSE;
            else {
-               data = NULL;
-               size = 0;
+               mrp_log_error("Invalid boolean value '%s'.", val);
+               exit(1);
            }
+           ok = mrp_msg_append(msg, tag, type, bln);
+           break;
 
-           if (!mrp_msg_append(msg, tag, data, size)) {
-               mrp_log_error("Failed to add field %s='%s' to message.",
-                             tag, (char *)data);
+#define HANDLE_INT(_bits, _uget, _sget)                                        \
+       case MRP_MSG_FIELD_UINT##_bits:                                 \
+           u##_bits = (uint##_bits##_t)strtoul(val, &end, 0);          \
+           if (end && *end) {                                          \
+               mrp_log_error("Invalid uint%d value '%s'.", _bits, val); \
+               exit(1);                                                \
+           }                                                           \
+           ok = mrp_msg_append(msg, tag, type, u##_bits);              \
+           break;                                                      \
+       case MRP_MSG_FIELD_SINT##_bits:                                 \
+           s##_bits = (int##_bits##_t)strtol(val, &end, 0);            \
+           if (end && *end) {                                          \
+               mrp_log_error("Invalid sint%d value '%s'.", _bits, val); \
+               exit(1);                                                \
+           }                                                           \
+           ok = mrp_msg_append(msg, tag, type, s##_bits);              \
+           break
+           
+           HANDLE_INT(8 , strtol , strtoul);
+           HANDLE_INT(16, strtol , strtoul);
+           HANDLE_INT(32, strtol , strtoul);
+           HANDLE_INT(64, strtoll, strtoull);
+           
+       case MRP_MSG_FIELD_DOUBLE:
+           dbl = strtod(val, &end);
+           if (end && *end) {
+               mrp_log_error("Invalid double value '%s'.", val);
                exit(1);
            }
+           ok = mrp_msg_append(msg, tag, type, dbl);
+           break;
+           
+       default:
+           mrp_log_error("Invalid (or unimplemented) type 0x%x (%s).",
+                         type, argv[i + 1]);
+           ok = FALSE;
+       }
+
+       if (!ok) {
+           mrp_log_error("Failed to add field to message.");
+           exit(1);
        }
     }
 
     mrp_msg_dump(msg, stdout);
 
     size = mrp_msg_default_encode(msg, &encoded);
-    
-    if (size < 0) {
-       mrp_log_error("Failed to encode message.");
+    if (size <= 0) {
+       mrp_log_error("Failed to encode message with default encoder.");
        exit(1);
     }
-    
-    decoded = mrp_msg_default_decode(encoded, size);
 
+    mrp_log_info("encoded message size: %d", (int)size);
+
+    decoded = mrp_msg_default_decode(encoded, size);
     if (decoded == NULL) {
-       mrp_log_error("Failed to decode message.");
+       mrp_log_error("Failed to decode message with default decoder.");
        exit(1);
     }
 
     mrp_msg_dump(decoded, stdout);
 
+    mrp_msg_unref(msg);
+    mrp_msg_unref(decoded);
+    
     return 0;
 }
index 8d4776b..ccc7321 100644 (file)
@@ -8,6 +8,13 @@
 
 #include <murphy/common.h>
 
+#define TAG_END     ((uint16_t)0x0)
+#define TAG_SEQ     ((uint16_t)0x1)
+#define TAG_FOO     ((uint16_t)0x2)
+#define TAG_BAR     ((uint16_t)0x3)
+#define TAG_MSG     ((uint16_t)0x4)
+#define TAG_RPL     ((uint16_t)0x5)
+
 
 typedef struct {
     mrp_mainloop_t  *ml;
@@ -41,25 +48,35 @@ void closed_evt(mrp_transport_t *t, int error, void *user_data)
 
 void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
 {
-    context_t *c = (context_t *)user_data;
-
-    MRP_UNUSED(t);    
-    MRP_UNUSED(c);
+    context_t       *c = (context_t *)user_data;
+    mrp_msg_field_t *f;
+    uint32_t         seq;
+    char             buf[256];
     
-    mrp_log_info("Received a message.");
+    mrp_log_info("received a message");
     mrp_msg_dump(msg, stdout);
     
     if (c->server) {
-
-#define REPLY_KEY "this_is_a_rather_long_reply_field_name_that_I_hope_will_cause_reallocation_of_the_message_receiving_buffer_on_the_server_side_and_we_will_see_if_it_can_automatically_readjust_its_buffers"
-#define REPLY_VAL "and_this_is_the_rather_long_value_of_the_rather_long_field_name_that_we_hope_might_break_something_if_the_allocation_algorithm_has_horrible_easy_to_exploit_holes"
-
-       mrp_msg_append(msg, REPLY_KEY, REPLY_VAL, strlen(REPLY_VAL) + 1);
-
+       seq = 0;
+       if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+           if (f->type == MRP_MSG_FIELD_UINT32)
+               seq = f->u32;
+       }
+           
+       snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+       
+       if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+                           TAG_END)) {
+           mrp_log_info("failed to append to received message");
+           exit(1);
+       }
+                          
        if (mrp_transport_send(t, msg))
-           mrp_log_info("Reply successfully sent.");
+           mrp_log_info("reply successfully sent");
        else
-           mrp_log_error("Failed to send reply.");
+           mrp_log_error("failed to send reply");
+
+       /* message unreffed by transport layer */
     }
 }
 
@@ -127,43 +144,37 @@ void server_init(context_t *c)
 
 void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
 {
-    static int seqno = 1;
-
+    static uint32_t seqno = 1;
+    
     context_t *c = (context_t *)user_data;
     mrp_msg_t *msg;
-    char       seq[32];
-    int        len;
+    uint32_t   seq;
+    char       buf[256];
+    uint32_t   len;
 
     MRP_UNUSED(ml);
     MRP_UNUSED(t);
 
-    if ((msg = mrp_msg_create(NULL)) == NULL) {
-       mrp_log_error("Failed to create new message.");
-       exit(1);
-    }
-
-    len = snprintf(seq, sizeof(seq), "%d", seqno);
     
-#define LONG_KEY "aaaaaaaaaaaallllllllllllloooooooooooonnnnnnnnnnngggggggggffffffffffffiiiiiiiiiiiiieeeeeeeeeeeelllllllllllllddddddddddddnnnnnnnnnnnnnnnaaaaaaaaaaaaaaaammmmmmmmmmmmmmmeeeeeeeeeeeeeeeeeeeeee"
-#define LONG_VAL "aaaaaaaaaaallllllllllllllllloooooooooooonnnnnnnnngggggggggggvvvvvvvvvvvvaaaaaaaaaaaaaalllllllllluuuuuuuuuuuuuueeeeee"
-
-    if (!mrp_msg_append(msg, "seq", seq, len + 1) ||
-       !mrp_msg_append(msg, "foo", "bar", sizeof("bar")) ||
-       !mrp_msg_append(msg, "bar", "foo", sizeof("foo")) ||
-       !mrp_msg_append(msg, "foobar", "barfoo", sizeof("barfoo")) ||
-       !mrp_msg_append(msg, "barfoo", "foobar", sizeof("foobar")) ||
-       !mrp_msg_append(msg, LONG_KEY, LONG_VAL, strlen(LONG_VAL) + 1)) {
-       mrp_log_error("Failed to construct message #%d.", seqno);
+    seq = seqno++;
+    len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
+    if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
+                             TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
+                             TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
+                             TAG_MSG, MRP_MSG_FIELD_BLOB  , len, buf,
+                             TAG_END)) == NULL) {
+       mrp_log_error("Failed to create new message.");
        exit(1);
     }
 
     if (!mrp_transport_send(c->t, msg)) {
-       mrp_log_error("Failed to send message #%d.", seqno);
+       mrp_log_error("Failed to send message #%d.", seq);
        exit(1);
     }
-    else {
-       mrp_log_info("Message #%d succesfully sent.", seqno++);
-    }
+    else
+       mrp_log_info("Message #%d succesfully sent.", seq);
+    
+    mrp_msg_unref(msg);
 }
 
 
index 63425e4..7201038 100644 (file)
@@ -8,6 +8,13 @@
 
 #include <murphy/common.h>
 
+#define TAG_END     ((uint16_t)0x0)
+#define TAG_SEQ     ((uint16_t)0x1)
+#define TAG_FOO     ((uint16_t)0x2)
+#define TAG_BAR     ((uint16_t)0x3)
+#define TAG_MSG     ((uint16_t)0x4)
+#define TAG_RPL     ((uint16_t)0x5)
+
 
 typedef struct {
     mrp_mainloop_t  *ml;
@@ -41,25 +48,35 @@ void closed_evt(mrp_transport_t *t, int error, void *user_data)
 
 void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
 {
-    context_t *c = (context_t *)user_data;
-
-    MRP_UNUSED(t);    
-    MRP_UNUSED(c);
+    context_t       *c = (context_t *)user_data;
+    mrp_msg_field_t *f;
+    uint32_t         seq;
+    char             buf[256];
     
-    mrp_log_info("Received a message.");
+    mrp_log_info("received a message");
     mrp_msg_dump(msg, stdout);
     
     if (c->server) {
-
-#define REPLY_KEY "this_is_a_rather_long_reply_field_name_that_I_hope_will_cause_reallocation_of_the_message_receiving_buffer_on_the_server_side_and_we_will_see_if_it_can_automatically_readjust_its_buffers"
-#define REPLY_VAL "and_this_is_the_rather_long_value_of_the_rather_long_field_name_that_we_hope_might_break_something_if_the_allocation_algorithm_has_horrible_easy_to_exploit_holes"
-
-       mrp_msg_append(msg, REPLY_KEY, REPLY_VAL, strlen(REPLY_VAL) + 1);
-
+       seq = 0;
+       if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+           if (f->type == MRP_MSG_FIELD_UINT32)
+               seq = f->u32;
+       }
+           
+       snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+       
+       if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+                           TAG_END)) {
+           mrp_log_info("failed to append to received message");
+           exit(1);
+       }
+                          
        if (mrp_transport_send(t, msg))
-           mrp_log_info("Reply successfully sent.");
+           mrp_log_info("reply successfully sent");
        else
-           mrp_log_error("Failed to send reply.");
+           mrp_log_error("failed to send reply");
+
+       /* message unreffed by transport layer */
     }
 }
 
@@ -127,43 +144,37 @@ void server_init(context_t *c)
 
 void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
 {
-    static int seqno = 1;
-
+    static uint32_t seqno = 1;
+    
     context_t *c = (context_t *)user_data;
     mrp_msg_t *msg;
-    char       seq[32];
-    int        len;
+    uint32_t   seq;
+    char       buf[256];
+    uint32_t   len;
 
     MRP_UNUSED(ml);
     MRP_UNUSED(t);
 
-    if ((msg = mrp_msg_create(NULL)) == NULL) {
-       mrp_log_error("Failed to create new message.");
-       exit(1);
-    }
-
-    len = snprintf(seq, sizeof(seq), "%d", seqno);
     
-#define LONG_KEY "aaaaaaaaaaaallllllllllllloooooooooooonnnnnnnnnnngggggggggffffffffffffiiiiiiiiiiiiieeeeeeeeeeeelllllllllllllddddddddddddnnnnnnnnnnnnnnnaaaaaaaaaaaaaaaammmmmmmmmmmmmmmeeeeeeeeeeeeeeeeeeeeee"
-#define LONG_VAL "aaaaaaaaaaallllllllllllllllloooooooooooonnnnnnnnngggggggggggvvvvvvvvvvvvaaaaaaaaaaaaaalllllllllluuuuuuuuuuuuuueeeeee"
-
-    if (!mrp_msg_append(msg, "seq", seq, len + 1) ||
-       !mrp_msg_append(msg, "foo", "bar", sizeof("bar")) ||
-       !mrp_msg_append(msg, "bar", "foo", sizeof("foo")) ||
-       !mrp_msg_append(msg, "foobar", "barfoo", sizeof("barfoo")) ||
-       !mrp_msg_append(msg, "barfoo", "foobar", sizeof("foobar")) ||
-       !mrp_msg_append(msg, LONG_KEY, LONG_VAL, strlen(LONG_VAL) + 1)) {
-       mrp_log_error("Failed to construct message #%d.", seqno);
+    seq = seqno++;
+    len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
+    if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
+                             TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
+                             TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
+                             TAG_MSG, MRP_MSG_FIELD_BLOB  , len, buf,
+                             TAG_END)) == NULL) {
+       mrp_log_error("Failed to create new message.");
        exit(1);
     }
 
     if (!mrp_transport_send(c->t, msg)) {
-       mrp_log_error("Failed to send message #%d.", seqno);
+       mrp_log_error("Failed to send message #%d.", seq);
        exit(1);
     }
-    else {
-       mrp_log_info("Message #%d succesfully sent.", seqno++);
-    }
+    else
+       mrp_log_info("Message #%d succesfully sent.", seq);
+    
+    mrp_msg_unref(msg);
 }
 
 
index 7d1a96a..662b5e5 100644 (file)
@@ -8,6 +8,13 @@
 
 #include <murphy/common.h>
 
+#define TAG_END     ((uint16_t)0x0)
+#define TAG_SEQ     ((uint16_t)0x1)
+#define TAG_FOO     ((uint16_t)0x2)
+#define TAG_BAR     ((uint16_t)0x3)
+#define TAG_MSG     ((uint16_t)0x4)
+#define TAG_RPL     ((uint16_t)0x5)
+
 
 typedef struct {
     mrp_mainloop_t  *ml;
@@ -22,49 +29,72 @@ typedef struct {
 
 void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
 {
-    context_t *c = (context_t *)user_data;
-
-    MRP_UNUSED(t);    
-    MRP_UNUSED(c);
+    context_t       *c = (context_t *)user_data;
+    mrp_msg_field_t *f;
+    uint32_t         seq;
+    char             buf[256];
     
-    mrp_log_info("Received a message.");
+    mrp_log_info("received a message");
     mrp_msg_dump(msg, stdout);
     
     if (c->server) {
+       seq = 0;
+       if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+           if (f->type == MRP_MSG_FIELD_UINT32)
+               seq = f->u32;
+       }
+           
+       snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+       
+       if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+                           TAG_END)) {
+           mrp_log_info("failed to append to received message");
+           exit(1);
+       }
+                          
        if (mrp_transport_send(t, msg))
-           mrp_log_info("Reply successfully sent.");
+           mrp_log_info("reply successfully sent");
        else
-           mrp_log_error("Failed to send reply.");
-    }
+           mrp_log_error("failed to send reply");
 
-#if 0 /* done by the tranport layer... */
-    mrp_msg_destroy(msg);
-#endif
+       /* message unreffed by transport layer */
+    }
 }
 
 
 void recvfrom_evt(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
                  socklen_t addrlen, void *user_data)
 {
-    context_t *c = (context_t *)user_data;
-
-    MRP_UNUSED(t);    
-    MRP_UNUSED(c);
+    context_t       *c = (context_t *)user_data;
+    mrp_msg_field_t *f;
+    uint32_t         seq;
+    char             buf[256];
     
-    mrp_log_info("Received a message.");
+    mrp_log_info("received a message");
     mrp_msg_dump(msg, stdout);
     
     if (c->server) {
-       mrp_msg_append(msg, "type", "reply", strlen("reply")+1);
+       seq = 0;
+       if ((f = mrp_msg_find(msg, TAG_SEQ)) != NULL) {
+           if (f->type == MRP_MSG_FIELD_UINT32)
+               seq = f->u32;
+       }
+           
+       snprintf(buf, sizeof(buf), "reply to message #%u", seq);
+       
+       if (!mrp_msg_append(msg, TAG_RPL, MRP_MSG_FIELD_STRING, buf,
+                           TAG_END)) {
+           mrp_log_info("failed to append to received message");
+           exit(1);
+       }
+                          
        if (mrp_transport_sendto(t, msg, addr, addrlen))
-           mrp_log_info("Reply successfully sent.");
+           mrp_log_info("reply successfully sent");
        else
-           mrp_log_error("Failed to send reply.");
-    }
+           mrp_log_error("failed to send reply");
 
-#if 0 /* done by the tranport layer... */
-    mrp_msg_destroy(msg);
-#endif
+       /* message unreffed by transport layer */
+    }
 }
 
 
@@ -97,7 +127,7 @@ void server_init(context_t *c)
     mrp_sockaddr_t addr;
     socklen_t      addrlen;
 
-    c->t = mrp_transport_create(c->ml, "udp", &evt, c, 0);
+    c->t = mrp_transport_create(c->ml, "udp4", &evt, c, 0);
     
     if (c->t == NULL) {
        mrp_log_error("Failed to create new transport.");
@@ -122,39 +152,37 @@ void server_init(context_t *c)
 
 void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
 {
-    static int seqno = 1;
-
+    static uint32_t seqno = 1;
+    
     context_t *c = (context_t *)user_data;
     mrp_msg_t *msg;
-    char       seq[32];
-    int        len;
+    uint32_t   seq;
+    char       buf[256];
+    uint32_t   len;
 
     MRP_UNUSED(ml);
     MRP_UNUSED(t);
 
-    if ((msg = mrp_msg_create(NULL)) == NULL) {
-       mrp_log_error("Failed to create new message.");
-       exit(1);
-    }
-
-    len = snprintf(seq, sizeof(seq), "%d", seqno);
     
-    if (!mrp_msg_append(msg, "seq", seq, len + 1) ||
-       !mrp_msg_append(msg, "foo", "bar", sizeof("bar")) ||
-       !mrp_msg_append(msg, "bar", "foo", sizeof("foo")) ||
-       !mrp_msg_append(msg, "foobar", "barfoo", sizeof("barfoo")) ||
-       !mrp_msg_append(msg, "barfoo", "foobar", sizeof("foobar"))) {
-       mrp_log_error("Failed to construct message #%d.", seqno);
+    seq = seqno++;
+    len = snprintf(buf, sizeof(buf), "This is message %u.", (unsigned int)seq);
+    if ((msg = mrp_msg_create(TAG_SEQ, MRP_MSG_FIELD_UINT32, seq,
+                             TAG_FOO, MRP_MSG_FIELD_STRING, "foo",
+                             TAG_BAR, MRP_MSG_FIELD_STRING, "bar",
+                             TAG_MSG, MRP_MSG_FIELD_BLOB  , len, buf,
+                             TAG_END)) == NULL) {
+       mrp_log_error("Failed to create new message.");
        exit(1);
     }
 
     if (!mrp_transport_send(c->t, msg)) {
-       mrp_log_error("Failed to send message #%d.", seqno);
+       mrp_log_error("Failed to send message #%d.", seq);
        exit(1);
     }
-    else {
-       mrp_log_info("Message #%d succesfully sent.", seqno++);
-    }
+    else
+       mrp_log_info("Message #%d succesfully sent.", seq);
+    
+    mrp_msg_unref(msg);
 }
 
 
@@ -177,7 +205,7 @@ void client_init(context_t *c)
        exit(1);
     }
 
-    c->t = mrp_transport_create(c->ml, type, &evt, c, 0);
+    c->t = mrp_transport_create(c->ml, "udp4", &evt, c, 0);
     
     if (c->t == NULL) {
        mrp_log_error("Failed to create new transport.");
index dfd6a5c..1455755 100644 (file)
@@ -118,7 +118,6 @@ socklen_t mrp_transport_resolve(mrp_transport_t *t, const char *str,
                                mrp_sockaddr_t *addr, socklen_t size,
                                const char **type)
 {
-#if 1
     mrp_transport_descr_t *d;
     mrp_list_hook_t       *p, *n;
     socklen_t              l;
@@ -139,25 +138,6 @@ socklen_t mrp_transport_resolve(mrp_transport_t *t, const char *str,
     }
     
     return 0;
-#else
-    mrp_transport_descr_t *d;
-    char                  *p, type[32];
-    int                    n;
-    
-    if ((p = strchr(str, ':')) != NULL && (n = p - str) < (int)sizeof(type)) {
-       strncpy(type, str, n);
-       type[n] = '\0';
-
-       if (t != NULL)
-           return t->descr->resolve(p + 1, addr, size);
-       else {
-           if ((d = find_transport(type)) != NULL)
-               return d->resolve(p + 1, addr, size);
-       }
-    }
-
-    return 0;
-#endif
 }
 
 
index 669df6d..1ea23eb 100644 (file)
@@ -11,6 +11,7 @@
 #include <readline/history.h>
 
 #include <murphy/common.h>
+#include <murphy/plugins/console-protocol.h>
 
 #define client_info  mrp_log_info
 #define client_warn  mrp_log_warning
@@ -106,22 +107,29 @@ void prompt_display(client_t *c)
 void prompt_process_input(client_t *c, char *input)
 {
     mrp_msg_t *msg;
-    int        len;
-    
+    uint16_t   tag, type;
+    uint32_t   len;
+
     len = input ? strlen(input) + 1: 0;
 
     if (len > 1) {
        add_history(input);
        prompt_erase(c);
        
-       if ((msg = mrp_msg_create("input", input, len, NULL)) != NULL) {
+       tag  = MRP_CONSOLE_INPUT;
+       type = MRP_MSG_FIELD_BLOB;
+       msg  = mrp_msg_create(tag, type, len, input, MRP_MSG_FIELD_INVALID);
+
+       if (msg != NULL) {
            mrp_transport_send(c->t, msg);
            mrp_msg_unref(msg);
-           prompt_display(c);
        }
-       else
-           client_error("failed to send request to server.");
+       
+       prompt_display(c);
+       return;
     }
+
+    client_error("failed to send request to server.");
 }
 
 
@@ -177,22 +185,26 @@ static void input_cleanup(client_t *c)
 
 void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
 {
-    client_t *c = (client_t *)user_data;
-    char     *prompt, *output, buf[128], *dummy;
-    size_t    size;
+    client_t        *c = (client_t *)user_data;
+    mrp_msg_field_t *f;
+    char           *prompt, *output, buf[128];
+    size_t          size;
     
     MRP_UNUSED(t);
     
     prompt_erase(c);
 
-    if ((output = mrp_msg_find(msg, "output", &size)) != NULL) {
+    if ((f = mrp_msg_find(msg, MRP_CONSOLE_OUTPUT)) != NULL) {
+       output = f->str;
+       size   = f->size[0];
        printf("%.*s", (int)size, output);
     }
-    else if ((prompt = mrp_msg_find(msg, "prompt", &size)) != NULL) {
-       snprintf(buf, sizeof(buf), "%.*s> ", (int)size, prompt);
+    else if ((f = mrp_msg_find(msg, MRP_CONSOLE_PROMPT)) != NULL) {
+       prompt = f->str;
+       snprintf(buf, sizeof(buf), "%s> ", prompt);
        prompt_set(c, buf);
     }
-    else if ((dummy = mrp_msg_find(msg, "bye", &size)) != NULL) {
+    else if ((f = mrp_msg_find(msg, MRP_CONSOLE_BYE)) != NULL) {
        mrp_mainloop_quit(c->ml, 0);
        return;
     }
diff --git a/src/plugins/console-protocol.h b/src/plugins/console-protocol.h
new file mode 100644 (file)
index 0000000..48b41b2
--- /dev/null
@@ -0,0 +1,10 @@
+#ifndef __MURPHY_CONSOLE_PROTOCOL_H__
+#define __MURPHY_CONSOLE_PROTOCOL_H__
+
+#define MRP_CONSOLE_INPUT  0x1
+#define MRP_CONSOLE_OUTPUT 0x2
+#define MRP_CONSOLE_PROMPT 0x3
+#define MRP_CONSOLE_BYE    0x4
+
+
+#endif /* __MURPHY_CONSOLE_PROTOCOL_H__ */
index 005629b..c815a3b 100644 (file)
@@ -9,6 +9,7 @@
 
 #include <murphy/common.h>
 #include <murphy/core.h>
+#include <murphy/plugins/console-protocol.h>
 
 #define console_info(fmt, args...)  mrp_log_info("console: "fmt , ## args)
 #define console_warn(fmt, args...)  mrp_log_warning("console: "fmt , ## args)
@@ -52,8 +53,15 @@ static ssize_t write_req(mrp_console_t *mc, void *buf, size_t size)
 {
     console_t *c = (console_t *)mc->backend_data;
     mrp_msg_t *msg;
+    uint16_t   tag, type;
+    uint32_t   len;
 
-    if ((msg = mrp_msg_create("output", buf, size, NULL)) != NULL) {
+    tag  = MRP_CONSOLE_OUTPUT;
+    type = MRP_MSG_FIELD_BLOB;
+    len  = size;
+    msg  = mrp_msg_create(tag, type, len, buf, MRP_MSG_FIELD_INVALID);
+
+    if (msg != NULL) {
        mrp_transport_send(c->t, msg);
        mrp_msg_unref(msg);
        
@@ -67,7 +75,7 @@ static ssize_t write_req(mrp_console_t *mc, void *buf, size_t size)
 static void tcp_close_req(mrp_console_t *mc)
 {
     console_t *c = (console_t *)mc->backend_data;
-    
+
     mrp_transport_disconnect(c->t);
     mrp_transport_destroy(c->t);
     c->t = NULL;
@@ -78,10 +86,16 @@ static void udp_close_req(mrp_console_t *mc)
 {
     console_t *c = (console_t *)mc->backend_data;
     mrp_msg_t *msg;
-    int        dummy = TRUE;
+    uint16_t   tag, type;
 
-    if ((msg = mrp_msg_create("bye", &dummy, sizeof(dummy), NULL)) != NULL)
+    tag  = MRP_CONSOLE_BYE;
+    type = MRP_MSG_FIELD_BOOL;
+    msg  = mrp_msg_create(tag, type, TRUE, MRP_MSG_FIELD_INVALID);
+
+    if (msg != NULL) {
        mrp_transport_send(c->t, msg);
+       mrp_msg_unref(msg);
+    }
     
     mrp_transport_disconnect(c->t);
 }
@@ -91,9 +105,12 @@ static void set_prompt_req(mrp_console_t *mc, const char *prompt)
 {
     console_t *c = (console_t *)mc->backend_data;
     mrp_msg_t *msg;
+    uint16_t   tag, type;
+
+    tag  = MRP_CONSOLE_PROMPT;
+    type = MRP_MSG_FIELD_STRING;
+    msg  = mrp_msg_create(tag, type, prompt, MRP_MSG_FIELD_INVALID);
 
-    msg = mrp_msg_create("prompt", prompt, strlen(prompt) + 1, NULL);
-    
     if (msg != NULL) {
        mrp_transport_send(c->t, msg);
        mrp_msg_unref(msg);
@@ -109,23 +126,27 @@ static void free_req(void *backend_data)
 
 static void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
 {
-    console_t *c = (console_t *)user_data;
-    char      *input;
-    size_t     size;
+    console_t       *c = (console_t *)user_data;
+    mrp_msg_field_t *f;
+    char            *input;
+    size_t           size;
 
     MRP_UNUSED(t);
     
-    input = mrp_msg_find(msg, "input", &size);
-
-    if (input != NULL) {
-       MRP_CONSOLE_BUSY(c->mc, {
-               c->mc->evt.input(c->mc, input, size);
-           });
+    if ((f = mrp_msg_find(msg, MRP_CONSOLE_INPUT)) != NULL) {
+       if (f->type == MRP_MSG_FIELD_BLOB) {
+           input = f->str;
+           size  = f->size[0];
+           MRP_CONSOLE_BUSY(c->mc, {
+                   c->mc->evt.input(c->mc, input, size);
+               });
        
-       c->mc->check_destroy(c->mc);
+           c->mc->check_destroy(c->mc);
+           return;
+       }
     }
-    else
-       mrp_log_error("Received malformed console message.");
+
+    mrp_log_error("Received malformed console message.");
 }
 
 
@@ -133,39 +154,48 @@ static void recvfrom_evt(mrp_transport_t *t, mrp_msg_t *msg,
                         mrp_sockaddr_t *addr, socklen_t addrlen,
                         void *user_data)
 {
-    console_t *c = (console_t *)user_data;
-    char      *input;
-    size_t     size;
+    console_t       *c = (console_t *)user_data;
+    mrp_msg_field_t *f;
+    char            *input;
+    size_t           size;
 
     MRP_UNUSED(t);
     
     mrp_debug("got new message...");
 
-    input = mrp_msg_find(msg, "input", &size);
-
-    if (input != NULL) {
-       mrp_sockaddr_t   a;
-       socklen_t        l;
+    if ((f = mrp_msg_find(msg, MRP_CONSOLE_INPUT)) != NULL) {
+       if (f->type == MRP_MSG_FIELD_STRING) {
+           input = f->str;
+           size  = f->size[0];
+           
+           if (input != NULL) {
+               mrp_sockaddr_t   a;
+               socklen_t        l;
 
-       mrp_sockaddr_cpy(&a, &c->addr, l=c->addrlen);
-       mrp_sockaddr_cpy(&c->addr, addr, c->addrlen=addrlen);
+               mrp_sockaddr_cpy(&a, &c->addr, l=c->addrlen);
+               mrp_sockaddr_cpy(&c->addr, addr, c->addrlen=addrlen);
        
-       mrp_transport_connect(t, addr, addrlen);
-       MRP_CONSOLE_BUSY(c->mc, {
-               c->mc->evt.input(c->mc, input, size);
-           });
+               mrp_transport_connect(t, addr, addrlen);
+               MRP_CONSOLE_BUSY(c->mc, {
+                       c->mc->evt.input(c->mc, input, size);
+                   });
 
-       c->mc->check_destroy(c->mc);
+               c->mc->check_destroy(c->mc);
 
-       mrp_transport_disconnect(t);
+               mrp_transport_disconnect(t);
+
+               mrp_sockaddr_cpy(&c->addr, &a, c->addrlen=l);
 
-       mrp_sockaddr_cpy(&c->addr, &a, c->addrlen=l);
 
-       if (l)
-           mrp_transport_connect(t, &a, l);
+               if (l)
+                   mrp_transport_connect(t, &a, l);
+
+               return;
+           }
+       }
     }
-    else
-       mrp_log_error("Received malformed console message.");
+
+    mrp_log_error("Received malformed console message.");
 }