From 1ae6dee29118725b077e74fdd32869fc9fb7499b Mon Sep 17 00:00:00 2001 From: Krisztian Litkey Date: Thu, 2 Oct 2014 11:07:27 +0300 Subject: [PATCH] common: add an explicit transport mode for JSON-based encoding. Promoted JSON-based encoding to an explicit transport mode of its own. Taught the stream- and datagram-transports to talk JSON. The datagram-transport JSON code is untested. The stream-transport has been minimally exercised. The websocket transport has not been flipped over to the dedicated mode yet. It still runs the transport in custom mode for JSON- based messaging. --- src/common/dbus-libdbus-transport.c | 1 + src/common/dbus-transport.c | 1 + src/common/dgram-transport.c | 82 +++++++++++++++++++++++++++++++++++-- src/common/internal-transport.c | 1 + src/common/stream-transport.c | 53 ++++++++++++++++++++++-- src/common/transport.c | 55 +++++++++++++++++++++++++ src/common/transport.h | 26 +++++++++++- src/common/wsck-transport.c | 1 + 8 files changed, 210 insertions(+), 10 deletions(-) diff --git a/src/common/dbus-libdbus-transport.c b/src/common/dbus-libdbus-transport.c index 0efb855..af8d9a8 100644 --- a/src/common/dbus-libdbus-transport.c +++ b/src/common/dbus-libdbus-transport.c @@ -1741,4 +1741,5 @@ MRP_REGISTER_TRANSPORT(dbus, DBUS, dbus_t, dbus_resolve, dbus_sendraw, dbus_sendrawto, dbus_senddata, dbus_senddatato, NULL, NULL, + NULL, NULL, NULL, NULL); diff --git a/src/common/dbus-transport.c b/src/common/dbus-transport.c index 59e0fcb..8dfc3c5 100644 --- a/src/common/dbus-transport.c +++ b/src/common/dbus-transport.c @@ -1716,5 +1716,6 @@ MRP_REGISTER_TRANSPORT(dbus, DBUS, dbus_t, dbus_resolve, dbus_sendraw, dbus_sendrawto, dbus_senddata, dbus_senddatato, NULL, NULL, + NULL, NULL, NULL, NULL); diff --git a/src/common/dgram-transport.c b/src/common/dgram-transport.c index 0bd94bb..b54ed62 100644 --- a/src/common/dgram-transport.c +++ b/src/common/dgram-transport.c @@ -668,7 +668,8 @@ static int senddatato(mrp_transport_t *mu, void *data, uint16_t tag, if (u->connected) n = send(u->sock, buf, len + sizeof(*lenp), 0); else - n = sendto(u->sock, buf, len + sizeof(*lenp), 0, &addr->any, addrlen); + n = sendto(u->sock, buf, len + sizeof(*lenp), 0, &addr->any, + addrlen); mrp_free(buf); @@ -761,6 +762,76 @@ static int dgrm_sendnativeto(mrp_transport_t *mu, void *data, uint32_t type_id, } +static int sendjsonto(mrp_transport_t *mu, mrp_json_t *msg, + mrp_sockaddr_t *addr, socklen_t addrlen) +{ + dgrm_t *u = (dgrm_t *)mu; + struct iovec iov[2]; + const char *s; + ssize_t size, n; + uint32_t len; + + if (MRP_UNLIKELY(u->sock == -1)) { + if (!open_socket(u, ((struct sockaddr *)addr)->sa_family)) + return FALSE; + } + + if (u->connected && (s = mrp_json_object_to_string(msg)) != NULL) { + size = strlen(s); + len = htobe32(size); + + iov[0].iov_base = &len; + iov[0].iov_len = sizeof(len); + iov[1].iov_base = (char *)s; + iov[1].iov_len = size; + + if (u->connected) + n = writev(u->sock, iov, 2); + else { + struct msghdr mh; + + mh.msg_name = &addr->any; + mh.msg_namelen = addrlen; + mh.msg_iov = iov; + mh.msg_iovlen = MRP_ARRAY_SIZE(iov); + mh.msg_control = NULL; + mh.msg_controllen = 0; + mh.msg_flags = 0; + + n = sendmsg(u->sock, &mh, 0); + } + + if (n == (ssize_t)(size + sizeof(len))) + return TRUE; + else { + if (n == -1 && errno == EAGAIN) { + mrp_log_error("%s(): XXX TODO: this sucks, need to add " + "output queuing for dgrm-transport.", + __FUNCTION__); + } + } + } + + return FALSE; +} + + +static int dgrm_sendjson(mrp_transport_t *mu, mrp_json_t *msg) +{ + if (mu->connected) + return sendjsonto(mu, msg, NULL, 0); + else + return FALSE; +} + + +static int dgrm_sendjsonto(mrp_transport_t *mu, mrp_json_t *msg, + mrp_sockaddr_t *addr, socklen_t addrlen) +{ + return sendjsonto(mu, msg, addr, addrlen); +} + + MRP_REGISTER_TRANSPORT(udp4, UDP4, dgrm_t, dgrm_resolve, dgrm_open, dgrm_createfrom, dgrm_close, NULL, dgrm_bind, dgrm_listen, NULL, @@ -769,7 +840,8 @@ MRP_REGISTER_TRANSPORT(udp4, UDP4, dgrm_t, dgrm_resolve, dgrm_sendraw, dgrm_sendrawto, dgrm_senddata, dgrm_senddatato, NULL, NULL, - dgrm_sendnative, dgrm_sendnativeto); + dgrm_sendnative, dgrm_sendnativeto, + dgrm_sendjson, dgrm_sendjsonto); MRP_REGISTER_TRANSPORT(udp6, UDP6, dgrm_t, dgrm_resolve, dgrm_open, dgrm_createfrom, dgrm_close, NULL, @@ -779,7 +851,8 @@ MRP_REGISTER_TRANSPORT(udp6, UDP6, dgrm_t, dgrm_resolve, dgrm_sendraw, dgrm_sendrawto, dgrm_senddata, dgrm_senddatato, NULL, NULL, - dgrm_sendnative, dgrm_sendnativeto); + dgrm_sendnative, dgrm_sendnativeto, + dgrm_sendjson, dgrm_sendjsonto); MRP_REGISTER_TRANSPORT(unxdgrm, UNXD, dgrm_t, dgrm_resolve, dgrm_open, dgrm_createfrom, dgrm_close, NULL, @@ -789,4 +862,5 @@ MRP_REGISTER_TRANSPORT(unxdgrm, UNXD, dgrm_t, dgrm_resolve, dgrm_sendraw, dgrm_sendrawto, dgrm_senddata, dgrm_senddatato, NULL, NULL, - dgrm_sendnative, dgrm_sendnativeto); + dgrm_sendnative, dgrm_sendnativeto, + dgrm_sendjson, dgrm_sendjsonto); diff --git a/src/common/internal-transport.c b/src/common/internal-transport.c index 74144ce..7ffae37 100644 --- a/src/common/internal-transport.c +++ b/src/common/internal-transport.c @@ -581,4 +581,5 @@ MRP_REGISTER_TRANSPORT(internal, INTERNAL, internal_t, internal_resolve, internal_sendraw, internal_sendrawto, internal_senddata, internal_senddatato, NULL, NULL, + NULL, NULL, NULL, NULL); diff --git a/src/common/stream-transport.c b/src/common/stream-transport.c index 93423de..99a4893 100644 --- a/src/common/stream-transport.c +++ b/src/common/stream-transport.c @@ -459,7 +459,16 @@ static void strm_recv_cb(mrp_io_watch_t *w, int fd, mrp_io_event_t events, data = NULL; size = 0; while (mrp_fragbuf_pull(t->buf, &data, &size)) { - error = t->recv_data(mt, data, size, NULL, 0); + if (t->mode != MRP_TRANSPORT_MODE_JSON) + error = t->recv_data(mt, data, size, NULL, 0); + else { + mrp_json_t *msg = mrp_json_string_to_object(data, size); + + if (msg != NULL) { + error = t->recv_data((mrp_transport_t *)t, msg, 0, NULL, 0); + mrp_json_unref(msg); + } + } if (error) goto fatal_error; @@ -725,6 +734,39 @@ static int strm_sendnative(mrp_transport_t *mt, void *data, uint32_t type_id) } +static int strm_sendjson(mrp_transport_t *mt, mrp_json_t *msg) +{ + strm_t *t = (strm_t *)mt; + struct iovec iov[2]; + const char *s; + ssize_t size, n; + uint32_t len; + + if (t->connected && (s = mrp_json_object_to_string(msg)) != NULL) { + size = strlen(s); + len = htobe32(size); + iov[0].iov_base = &len; + iov[0].iov_len = sizeof(len); + iov[1].iov_base = (void *)s; + iov[1].iov_len = size; + + n = writev(t->sock, iov, 2); + + if (n == (ssize_t)(size + sizeof(len))) + return TRUE; + else { + if (n == -1 && errno == EAGAIN) { + mrp_log_error("%s(): XXX TODO: this sucks, need to add " + "output queuing for strm-transport.", + __FUNCTION__); + } + } + } + + return FALSE; +} + + MRP_REGISTER_TRANSPORT(tcp4, TCP4, strm_t, strm_resolve, strm_open, strm_createfrom, strm_close, NULL, strm_bind, strm_listen, strm_accept, @@ -733,7 +775,8 @@ MRP_REGISTER_TRANSPORT(tcp4, TCP4, strm_t, strm_resolve, strm_sendraw, NULL, strm_senddata, NULL, NULL, NULL, - strm_sendnative, NULL); + strm_sendnative, NULL, + strm_sendjson, NULL); MRP_REGISTER_TRANSPORT(tcp6, TCP6, strm_t, strm_resolve, strm_open, strm_createfrom, strm_close, NULL, @@ -743,7 +786,8 @@ MRP_REGISTER_TRANSPORT(tcp6, TCP6, strm_t, strm_resolve, strm_sendraw, NULL, strm_senddata, NULL, NULL, NULL, - strm_sendnative, NULL); + strm_sendnative, NULL, + strm_sendjson, NULL); MRP_REGISTER_TRANSPORT(unxstrm, UNXS, strm_t, strm_resolve, strm_open, strm_createfrom, strm_close, NULL, @@ -753,4 +797,5 @@ MRP_REGISTER_TRANSPORT(unxstrm, UNXS, strm_t, strm_resolve, strm_sendraw, NULL, strm_senddata, NULL, NULL, NULL, - strm_sendnative, NULL); + strm_sendnative, NULL, + strm_sendjson, NULL); diff --git a/src/common/transport.c b/src/common/transport.c index 7c0d55f..d00c588 100644 --- a/src/common/transport.c +++ b/src/common/transport.c @@ -631,6 +631,43 @@ int mrp_transport_sendnativeto(mrp_transport_t *t, void *data, uint32_t type_id, } +int mrp_transport_sendjson(mrp_transport_t *t, mrp_json_t *msg) +{ + int result; + + if (t->mode == MRP_TRANSPORT_MODE_JSON && t->descr->req.sendjson) { + MRP_TRANSPORT_BUSY(t, { + result = t->descr->req.sendjson(t, msg); + }); + + purge_destroyed(t); + } + else + result = FALSE; + + return result; +} + + +int mrp_transport_sendjsonto(mrp_transport_t *t, mrp_json_t *msg, + mrp_sockaddr_t *addr, socklen_t addrlen) +{ + int result; + + if (t->mode == MRP_TRANSPORT_MODE_JSON && t->descr->req.sendjsonto) { + MRP_TRANSPORT_BUSY(t, { + result = t->descr->req.sendjsonto(t, msg, addr, addrlen); + }); + + purge_destroyed(t); + } + else + result = FALSE; + + return result; +} + + static int recv_data(mrp_transport_t *t, void *data, size_t size, mrp_sockaddr_t *addr, socklen_t addrlen) { @@ -767,6 +804,24 @@ static int recv_data(mrp_transport_t *t, void *data, size_t size, } return 0; + case MRP_TRANSPORT_MODE_JSON: + if (t->connected) { + if (t->evt.recvjson) { + MRP_TRANSPORT_BUSY(t, { + t->evt.recvjson(t, data, t->user_data); + }); + } + } + else { + if (t->evt.recvjsonfrom) { + MRP_TRANSPORT_BUSY(t, { + t->evt.recvjsonfrom(t, data, addr, addrlen, + t->user_data); + }); + } + } + return 0; + default: return -EPROTOTYPE; } diff --git a/src/common/transport.h b/src/common/transport.h index 1801c31..5a1d06f 100644 --- a/src/common/transport.h +++ b/src/common/transport.h @@ -39,6 +39,7 @@ #include #include #include +#include MRP_CDECL_BEGIN @@ -79,6 +80,7 @@ typedef enum { MRP_TRANSPORT_MODE_DATA = 0x02, /* uses registered data types */ MRP_TRANSPORT_MODE_CUSTOM = 0x03, /* custom message encoding */ MRP_TRANSPORT_MODE_NATIVE = 0x04, /* uses registered native-types */ + MRP_TRANSPORT_MODE_JSON = 0x05, /* uses JSON messages */ } mrp_transport_mode_t; typedef enum { @@ -135,8 +137,10 @@ typedef struct { int (*senddata)(mrp_transport_t *t, void *data, uint16_t tag); /** Send data with a custom encoder over a transport. */ int (*sendcustom)(mrp_transport_t *t, void *data); - /* Send a native type over a (connected) transport. */ + /** Send a native type over a (connected) transport. */ int (*sendnative)(mrp_transport_t *t, void *data, uint32_t type_id); + /** Send a JSON message over a (connected) transport. */ + int (*sendjson)(mrp_transport_t *t, mrp_json_t *msg); /** Send a message over a(n unconnected) transport. */ int (*sendmsgto)(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr, @@ -153,6 +157,9 @@ typedef struct { /** Send a native type over a transport. */ int (*sendnativeto)(mrp_transport_t *t, void *data, uint32_t type_id, mrp_sockaddr_t *addr, socklen_t addrlen); + /** Send a JSON messgae over a(n unconnected) transport. */ + int (*sendjsonto)(mrp_transport_t *t, mrp_json_t *msg, mrp_sockaddr_t *addr, + socklen_t addrlen); } mrp_transport_req_t; @@ -183,6 +190,8 @@ typedef struct { /** Native type callback for connected transports. */ void (*recvnative)(mrp_transport_t *t, void *data, uint32_t type_id, void *user_data); + /** JSON type callback for connected transports. */ + void (*recvjson)(mrp_transport_t *t, mrp_json_t *msg, void *user_data); }; /** Message received on an unconnected transport. */ @@ -207,6 +216,10 @@ typedef struct { void (*recvnativefrom)(mrp_transport_t *t, void *data, uint32_t type_id, mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data); + /** JSON type callback for unconnected transports. */ + void (*recvjsonfrom)(mrp_transport_t *t, mrp_json_t *msg, + mrp_sockaddr_t *addr, socklen_t addrlen, + void *user_data); }; /** Connection closed by peer. */ void (*closed)(mrp_transport_t *t, int error, void *user_data); @@ -385,7 +398,8 @@ struct mrp_transport_s { _sendraw, _sendrawto, \ _senddata, _senddatato, \ _sendcustom, _sendcustomto, \ - _sendnative, _sendnativeto) \ + _sendnative, _sendnativeto, \ + _sendjson, _sendjsonto) \ static void _prfx##_register_transport(void) \ __attribute__((constructor)); \ \ @@ -414,6 +428,8 @@ struct mrp_transport_s { .sendcustomto = _sendcustomto, \ .sendnative = _sendnative, \ .sendnativeto = _sendnativeto, \ + .sendjson = _sendjson, \ + .sendjsonto = _sendjsonto, \ }, \ }; \ \ @@ -508,6 +524,12 @@ int mrp_transport_sendnative(mrp_transport_t *t, void *data, uint32_t type_id); int mrp_transport_sendnativeto(mrp_transport_t *t, void *data, uint32_t type_id, mrp_sockaddr_t *addr, socklen_t addrlen); +/** Send a JSON message through the given (connected) transport. */ +int mrp_transport_sendjson(mrp_transport_t *t, mrp_json_t *msg); + +/** Send a JSON message through the given transport to the remote address. */ +int mrp_transport_sendjsonto(mrp_transport_t *t, mrp_json_t *msg, + mrp_sockaddr_t *addr, socklen_t addrlen); MRP_CDECL_END #endif /* __MURPHY_TRANSPORT_H__ */ diff --git a/src/common/wsck-transport.c b/src/common/wsck-transport.c index 77432dd..7f4a67b 100644 --- a/src/common/wsck-transport.c +++ b/src/common/wsck-transport.c @@ -991,4 +991,5 @@ MRP_REGISTER_TRANSPORT(wsck, WSCKP, wsck_t, wsck_resolve, wsck_sendraw, NULL, wsck_senddata, NULL, wsck_sendcustom, NULL, + NULL, NULL, NULL, NULL); -- 2.7.4