common: add an explicit transport mode for JSON-based encoding.
authorKrisztian Litkey <krisztian.litkey@intel.com>
Thu, 2 Oct 2014 08:07:27 +0000 (11:07 +0300)
committerKrisztian Litkey <krisztian.litkey@intel.com>
Fri, 10 Oct 2014 13:52:59 +0000 (16:52 +0300)
Promoted JSON-based encoding to an explicit transport mode of its
own. Taught the stream- and datagram-transports to talk JSON. The
datagram-transport JSON code is untested. The stream-transport has
been minimally exercised.

The websocket transport has not been flipped over to the dedicated
mode yet. It still runs the transport in custom mode for JSON-
based messaging.

src/common/dbus-libdbus-transport.c
src/common/dbus-transport.c
src/common/dgram-transport.c
src/common/internal-transport.c
src/common/stream-transport.c
src/common/transport.c
src/common/transport.h
src/common/wsck-transport.c

index 0efb855..af8d9a8 100644 (file)
@@ -1741,4 +1741,5 @@ MRP_REGISTER_TRANSPORT(dbus, DBUS, dbus_t, dbus_resolve,
                        dbus_sendraw, dbus_sendrawto,
                        dbus_senddata, dbus_senddatato,
                        NULL, NULL,
+                       NULL, NULL,
                        NULL, NULL);
index 59e0fcb..8dfc3c5 100644 (file)
@@ -1716,5 +1716,6 @@ MRP_REGISTER_TRANSPORT(dbus, DBUS, dbus_t, dbus_resolve,
                        dbus_sendraw, dbus_sendrawto,
                        dbus_senddata, dbus_senddatato,
                        NULL, NULL,
+                       NULL, NULL,
                        NULL, NULL);
 
index 0bd94bb..b54ed62 100644 (file)
@@ -668,7 +668,8 @@ static int senddatato(mrp_transport_t *mu, void *data, uint16_t tag,
             if (u->connected)
                 n = send(u->sock, buf, len + sizeof(*lenp), 0);
             else
-                n = sendto(u->sock, buf, len + sizeof(*lenp), 0, &addr->any, addrlen);
+                n = sendto(u->sock, buf, len + sizeof(*lenp), 0, &addr->any,
+                           addrlen);
 
             mrp_free(buf);
 
@@ -761,6 +762,76 @@ static int dgrm_sendnativeto(mrp_transport_t *mu, void *data, uint32_t type_id,
 }
 
 
+static int sendjsonto(mrp_transport_t *mu, mrp_json_t *msg,
+                      mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+    dgrm_t       *u = (dgrm_t *)mu;
+    struct iovec  iov[2];
+    const char   *s;
+    ssize_t       size, n;
+    uint32_t      len;
+
+    if (MRP_UNLIKELY(u->sock == -1)) {
+        if (!open_socket(u, ((struct sockaddr *)addr)->sa_family))
+            return FALSE;
+    }
+
+    if (u->connected && (s = mrp_json_object_to_string(msg)) != NULL) {
+        size = strlen(s);
+        len  = htobe32(size);
+
+        iov[0].iov_base = &len;
+        iov[0].iov_len  = sizeof(len);
+        iov[1].iov_base = (char *)s;
+        iov[1].iov_len  = size;
+
+        if (u->connected)
+            n = writev(u->sock, iov, 2);
+        else {
+            struct msghdr mh;
+
+            mh.msg_name       = &addr->any;
+            mh.msg_namelen    = addrlen;
+            mh.msg_iov        = iov;
+            mh.msg_iovlen     = MRP_ARRAY_SIZE(iov);
+            mh.msg_control    = NULL;
+            mh.msg_controllen = 0;
+            mh.msg_flags      = 0;
+
+            n = sendmsg(u->sock, &mh, 0);
+        }
+
+        if (n == (ssize_t)(size + sizeof(len)))
+            return TRUE;
+        else {
+            if (n == -1 && errno == EAGAIN) {
+                mrp_log_error("%s(): XXX TODO: this sucks, need to add "
+                              "output queuing for dgrm-transport.",
+                              __FUNCTION__);
+            }
+        }
+    }
+
+    return FALSE;
+}
+
+
+static int dgrm_sendjson(mrp_transport_t *mu, mrp_json_t *msg)
+{
+    if (mu->connected)
+        return sendjsonto(mu, msg, NULL, 0);
+    else
+        return FALSE;
+}
+
+
+static int dgrm_sendjsonto(mrp_transport_t *mu, mrp_json_t *msg,
+                           mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+    return sendjsonto(mu, msg, addr, addrlen);
+}
+
+
 MRP_REGISTER_TRANSPORT(udp4, UDP4, dgrm_t, dgrm_resolve,
                        dgrm_open, dgrm_createfrom, dgrm_close, NULL,
                        dgrm_bind, dgrm_listen, NULL,
@@ -769,7 +840,8 @@ MRP_REGISTER_TRANSPORT(udp4, UDP4, dgrm_t, dgrm_resolve,
                        dgrm_sendraw, dgrm_sendrawto,
                        dgrm_senddata, dgrm_senddatato,
                        NULL, NULL,
-                       dgrm_sendnative, dgrm_sendnativeto);
+                       dgrm_sendnative, dgrm_sendnativeto,
+                       dgrm_sendjson, dgrm_sendjsonto);
 
 MRP_REGISTER_TRANSPORT(udp6, UDP6, dgrm_t, dgrm_resolve,
                        dgrm_open, dgrm_createfrom, dgrm_close, NULL,
@@ -779,7 +851,8 @@ MRP_REGISTER_TRANSPORT(udp6, UDP6, dgrm_t, dgrm_resolve,
                        dgrm_sendraw, dgrm_sendrawto,
                        dgrm_senddata, dgrm_senddatato,
                        NULL, NULL,
-                       dgrm_sendnative, dgrm_sendnativeto);
+                       dgrm_sendnative, dgrm_sendnativeto,
+                       dgrm_sendjson, dgrm_sendjsonto);
 
 MRP_REGISTER_TRANSPORT(unxdgrm, UNXD, dgrm_t, dgrm_resolve,
                        dgrm_open, dgrm_createfrom, dgrm_close, NULL,
@@ -789,4 +862,5 @@ MRP_REGISTER_TRANSPORT(unxdgrm, UNXD, dgrm_t, dgrm_resolve,
                        dgrm_sendraw, dgrm_sendrawto,
                        dgrm_senddata, dgrm_senddatato,
                        NULL, NULL,
-                       dgrm_sendnative, dgrm_sendnativeto);
+                       dgrm_sendnative, dgrm_sendnativeto,
+                       dgrm_sendjson, dgrm_sendjsonto);
index 74144ce..7ffae37 100644 (file)
@@ -581,4 +581,5 @@ MRP_REGISTER_TRANSPORT(internal, INTERNAL, internal_t, internal_resolve,
                        internal_sendraw, internal_sendrawto,
                        internal_senddata, internal_senddatato,
                        NULL, NULL,
+                       NULL, NULL,
                        NULL, NULL);
index 93423de..99a4893 100644 (file)
@@ -459,7 +459,16 @@ static void strm_recv_cb(mrp_io_watch_t *w, int fd, mrp_io_event_t events,
         data = NULL;
         size = 0;
         while (mrp_fragbuf_pull(t->buf, &data, &size)) {
-            error = t->recv_data(mt, data, size, NULL, 0);
+            if (t->mode != MRP_TRANSPORT_MODE_JSON)
+                error = t->recv_data(mt, data, size, NULL, 0);
+            else {
+                mrp_json_t *msg = mrp_json_string_to_object(data, size);
+
+                if (msg != NULL) {
+                    error = t->recv_data((mrp_transport_t *)t, msg, 0, NULL, 0);
+                    mrp_json_unref(msg);
+                }
+            }
 
             if (error)
                 goto fatal_error;
@@ -725,6 +734,39 @@ static int strm_sendnative(mrp_transport_t *mt, void *data, uint32_t type_id)
 }
 
 
+static int strm_sendjson(mrp_transport_t *mt, mrp_json_t *msg)
+{
+    strm_t       *t = (strm_t *)mt;
+    struct iovec  iov[2];
+    const char   *s;
+    ssize_t       size, n;
+    uint32_t      len;
+
+    if (t->connected && (s = mrp_json_object_to_string(msg)) != NULL) {
+        size = strlen(s);
+        len  = htobe32(size);
+        iov[0].iov_base = &len;
+        iov[0].iov_len  = sizeof(len);
+        iov[1].iov_base = (void *)s;
+        iov[1].iov_len  = size;
+
+        n = writev(t->sock, iov, 2);
+
+        if (n == (ssize_t)(size + sizeof(len)))
+            return TRUE;
+        else {
+            if (n == -1 && errno == EAGAIN) {
+                mrp_log_error("%s(): XXX TODO: this sucks, need to add "
+                              "output queuing for strm-transport.",
+                              __FUNCTION__);
+            }
+        }
+    }
+
+    return FALSE;
+}
+
+
 MRP_REGISTER_TRANSPORT(tcp4, TCP4, strm_t, strm_resolve,
                        strm_open, strm_createfrom, strm_close, NULL,
                        strm_bind, strm_listen, strm_accept,
@@ -733,7 +775,8 @@ MRP_REGISTER_TRANSPORT(tcp4, TCP4, strm_t, strm_resolve,
                        strm_sendraw, NULL,
                        strm_senddata, NULL,
                        NULL, NULL,
-                       strm_sendnative, NULL);
+                       strm_sendnative, NULL,
+                       strm_sendjson, NULL);
 
 MRP_REGISTER_TRANSPORT(tcp6, TCP6, strm_t, strm_resolve,
                        strm_open, strm_createfrom, strm_close, NULL,
@@ -743,7 +786,8 @@ MRP_REGISTER_TRANSPORT(tcp6, TCP6, strm_t, strm_resolve,
                        strm_sendraw, NULL,
                        strm_senddata, NULL,
                        NULL, NULL,
-                       strm_sendnative, NULL);
+                       strm_sendnative, NULL,
+                       strm_sendjson, NULL);
 
 MRP_REGISTER_TRANSPORT(unxstrm, UNXS, strm_t, strm_resolve,
                        strm_open, strm_createfrom, strm_close, NULL,
@@ -753,4 +797,5 @@ MRP_REGISTER_TRANSPORT(unxstrm, UNXS, strm_t, strm_resolve,
                        strm_sendraw, NULL,
                        strm_senddata, NULL,
                        NULL, NULL,
-                       strm_sendnative, NULL);
+                       strm_sendnative, NULL,
+                       strm_sendjson, NULL);
index 7c0d55f..d00c588 100644 (file)
@@ -631,6 +631,43 @@ int mrp_transport_sendnativeto(mrp_transport_t *t, void *data, uint32_t type_id,
 }
 
 
+int mrp_transport_sendjson(mrp_transport_t *t, mrp_json_t *msg)
+{
+    int result;
+
+    if (t->mode == MRP_TRANSPORT_MODE_JSON && t->descr->req.sendjson) {
+        MRP_TRANSPORT_BUSY(t, {
+                result = t->descr->req.sendjson(t, msg);
+            });
+
+        purge_destroyed(t);
+    }
+    else
+        result = FALSE;
+
+    return result;
+}
+
+
+int mrp_transport_sendjsonto(mrp_transport_t *t, mrp_json_t *msg,
+                             mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+    int result;
+
+    if (t->mode == MRP_TRANSPORT_MODE_JSON && t->descr->req.sendjsonto) {
+        MRP_TRANSPORT_BUSY(t, {
+                result = t->descr->req.sendjsonto(t, msg, addr, addrlen);
+            });
+
+        purge_destroyed(t);
+    }
+    else
+        result = FALSE;
+
+    return result;
+}
+
+
 static int recv_data(mrp_transport_t *t, void *data, size_t size,
                      mrp_sockaddr_t *addr, socklen_t addrlen)
 {
@@ -767,6 +804,24 @@ static int recv_data(mrp_transport_t *t, void *data, size_t size,
         }
         return 0;
 
+    case MRP_TRANSPORT_MODE_JSON:
+        if (t->connected) {
+            if (t->evt.recvjson) {
+                MRP_TRANSPORT_BUSY(t, {
+                        t->evt.recvjson(t, data, t->user_data);
+                    });
+            }
+        }
+        else {
+            if (t->evt.recvjsonfrom) {
+                MRP_TRANSPORT_BUSY(t, {
+                        t->evt.recvjsonfrom(t, data, addr, addrlen,
+                                            t->user_data);
+                    });
+            }
+        }
+        return 0;
+
     default:
         return -EPROTOTYPE;
     }
index 1801c31..5a1d06f 100644 (file)
@@ -39,6 +39,7 @@
 #include <murphy/common/mainloop.h>
 #include <murphy/common/msg.h>
 #include <murphy/common/native-types.h>
+#include <murphy/common/json.h>
 
 MRP_CDECL_BEGIN
 
@@ -79,6 +80,7 @@ typedef enum {
     MRP_TRANSPORT_MODE_DATA   = 0x02,    /* uses registered data types */
     MRP_TRANSPORT_MODE_CUSTOM = 0x03,    /* custom message encoding */
     MRP_TRANSPORT_MODE_NATIVE = 0x04,    /* uses registered native-types */
+    MRP_TRANSPORT_MODE_JSON   = 0x05,    /* uses JSON messages */
 } mrp_transport_mode_t;
 
 typedef enum {
@@ -135,8 +137,10 @@ typedef struct {
     int (*senddata)(mrp_transport_t *t, void *data, uint16_t tag);
     /** Send data with a custom encoder over a transport. */
     int (*sendcustom)(mrp_transport_t *t, void *data);
-    /* Send a native type over a (connected) transport. */
+    /** Send a native type over a (connected) transport. */
     int (*sendnative)(mrp_transport_t *t, void *data, uint32_t type_id);
+    /** Send a JSON message over a (connected) transport. */
+    int (*sendjson)(mrp_transport_t *t, mrp_json_t *msg);
 
     /** Send a message over a(n unconnected) transport. */
     int (*sendmsgto)(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
@@ -153,6 +157,9 @@ typedef struct {
     /** Send a native type over a transport. */
     int (*sendnativeto)(mrp_transport_t *t, void *data, uint32_t type_id,
                         mrp_sockaddr_t *addr, socklen_t addrlen);
+    /** Send a JSON messgae over a(n unconnected) transport. */
+    int (*sendjsonto)(mrp_transport_t *t, mrp_json_t *msg, mrp_sockaddr_t *addr,
+                      socklen_t addrlen);
 } mrp_transport_req_t;
 
 
@@ -183,6 +190,8 @@ typedef struct {
         /** Native type callback for connected transports. */
         void (*recvnative)(mrp_transport_t *t, void *data, uint32_t type_id,
                            void *user_data);
+        /** JSON type callback for connected transports. */
+        void (*recvjson)(mrp_transport_t *t, mrp_json_t *msg, void *user_data);
     };
 
     /** Message received on an unconnected transport. */
@@ -207,6 +216,10 @@ typedef struct {
         void (*recvnativefrom)(mrp_transport_t *t, void *data, uint32_t type_id,
                                mrp_sockaddr_t *addr, socklen_t addrlen,
                                void *user_data);
+        /** JSON type callback for unconnected transports. */
+        void (*recvjsonfrom)(mrp_transport_t *t, mrp_json_t *msg,
+                             mrp_sockaddr_t *addr, socklen_t addrlen,
+                             void *user_data);
     };
     /** Connection closed by peer. */
     void (*closed)(mrp_transport_t *t, int error, void *user_data);
@@ -385,7 +398,8 @@ struct mrp_transport_s {
                                _sendraw, _sendrawto,                      \
                                _senddata, _senddatato,                    \
                                _sendcustom, _sendcustomto,                \
-                               _sendnative, _sendnativeto)                \
+                               _sendnative, _sendnativeto,                \
+                               _sendjson, _sendjsonto)                    \
     static void _prfx##_register_transport(void)                          \
          __attribute__((constructor));                                    \
                                                                           \
@@ -414,6 +428,8 @@ struct mrp_transport_s {
                 .sendcustomto = _sendcustomto,                            \
                 .sendnative   = _sendnative,                              \
                 .sendnativeto = _sendnativeto,                            \
+                .sendjson     = _sendjson,                                \
+                .sendjsonto   = _sendjsonto,                              \
             },                                                            \
         };                                                                \
                                                                           \
@@ -508,6 +524,12 @@ int mrp_transport_sendnative(mrp_transport_t *t, void *data, uint32_t type_id);
 int mrp_transport_sendnativeto(mrp_transport_t *t, void *data, uint32_t type_id,
                                mrp_sockaddr_t *addr, socklen_t addrlen);
 
+/** Send a JSON message through the given (connected) transport. */
+int mrp_transport_sendjson(mrp_transport_t *t, mrp_json_t *msg);
+
+/** Send a JSON message through the given transport to the remote address. */
+int mrp_transport_sendjsonto(mrp_transport_t *t, mrp_json_t *msg,
+                             mrp_sockaddr_t *addr, socklen_t addrlen);
 MRP_CDECL_END
 
 #endif /* __MURPHY_TRANSPORT_H__ */
index 77432dd..7f4a67b 100644 (file)
@@ -991,4 +991,5 @@ MRP_REGISTER_TRANSPORT(wsck, WSCKP, wsck_t, wsck_resolve,
                        wsck_sendraw, NULL,
                        wsck_senddata, NULL,
                        wsck_sendcustom, NULL,
+                       NULL, NULL,
                        NULL, NULL);