dbus_sendraw, dbus_sendrawto,
dbus_senddata, dbus_senddatato,
NULL, NULL,
+ NULL, NULL,
NULL, NULL);
dbus_sendraw, dbus_sendrawto,
dbus_senddata, dbus_senddatato,
NULL, NULL,
+ NULL, NULL,
NULL, NULL);
if (u->connected)
n = send(u->sock, buf, len + sizeof(*lenp), 0);
else
- n = sendto(u->sock, buf, len + sizeof(*lenp), 0, &addr->any, addrlen);
+ n = sendto(u->sock, buf, len + sizeof(*lenp), 0, &addr->any,
+ addrlen);
mrp_free(buf);
}
+static int sendjsonto(mrp_transport_t *mu, mrp_json_t *msg,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ dgrm_t *u = (dgrm_t *)mu;
+ struct iovec iov[2];
+ const char *s;
+ ssize_t size, n;
+ uint32_t len;
+
+ if (MRP_UNLIKELY(u->sock == -1)) {
+ if (!open_socket(u, ((struct sockaddr *)addr)->sa_family))
+ return FALSE;
+ }
+
+ if (u->connected && (s = mrp_json_object_to_string(msg)) != NULL) {
+ size = strlen(s);
+ len = htobe32(size);
+
+ iov[0].iov_base = &len;
+ iov[0].iov_len = sizeof(len);
+ iov[1].iov_base = (char *)s;
+ iov[1].iov_len = size;
+
+ if (u->connected)
+ n = writev(u->sock, iov, 2);
+ else {
+ struct msghdr mh;
+
+ mh.msg_name = &addr->any;
+ mh.msg_namelen = addrlen;
+ mh.msg_iov = iov;
+ mh.msg_iovlen = MRP_ARRAY_SIZE(iov);
+ mh.msg_control = NULL;
+ mh.msg_controllen = 0;
+ mh.msg_flags = 0;
+
+ n = sendmsg(u->sock, &mh, 0);
+ }
+
+ if (n == (ssize_t)(size + sizeof(len)))
+ return TRUE;
+ else {
+ if (n == -1 && errno == EAGAIN) {
+ mrp_log_error("%s(): XXX TODO: this sucks, need to add "
+ "output queuing for dgrm-transport.",
+ __FUNCTION__);
+ }
+ }
+ }
+
+ return FALSE;
+}
+
+
+static int dgrm_sendjson(mrp_transport_t *mu, mrp_json_t *msg)
+{
+ if (mu->connected)
+ return sendjsonto(mu, msg, NULL, 0);
+ else
+ return FALSE;
+}
+
+
+static int dgrm_sendjsonto(mrp_transport_t *mu, mrp_json_t *msg,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ return sendjsonto(mu, msg, addr, addrlen);
+}
+
+
MRP_REGISTER_TRANSPORT(udp4, UDP4, dgrm_t, dgrm_resolve,
dgrm_open, dgrm_createfrom, dgrm_close, NULL,
dgrm_bind, dgrm_listen, NULL,
dgrm_sendraw, dgrm_sendrawto,
dgrm_senddata, dgrm_senddatato,
NULL, NULL,
- dgrm_sendnative, dgrm_sendnativeto);
+ dgrm_sendnative, dgrm_sendnativeto,
+ dgrm_sendjson, dgrm_sendjsonto);
MRP_REGISTER_TRANSPORT(udp6, UDP6, dgrm_t, dgrm_resolve,
dgrm_open, dgrm_createfrom, dgrm_close, NULL,
dgrm_sendraw, dgrm_sendrawto,
dgrm_senddata, dgrm_senddatato,
NULL, NULL,
- dgrm_sendnative, dgrm_sendnativeto);
+ dgrm_sendnative, dgrm_sendnativeto,
+ dgrm_sendjson, dgrm_sendjsonto);
MRP_REGISTER_TRANSPORT(unxdgrm, UNXD, dgrm_t, dgrm_resolve,
dgrm_open, dgrm_createfrom, dgrm_close, NULL,
dgrm_sendraw, dgrm_sendrawto,
dgrm_senddata, dgrm_senddatato,
NULL, NULL,
- dgrm_sendnative, dgrm_sendnativeto);
+ dgrm_sendnative, dgrm_sendnativeto,
+ dgrm_sendjson, dgrm_sendjsonto);
internal_sendraw, internal_sendrawto,
internal_senddata, internal_senddatato,
NULL, NULL,
+ NULL, NULL,
NULL, NULL);
data = NULL;
size = 0;
while (mrp_fragbuf_pull(t->buf, &data, &size)) {
- error = t->recv_data(mt, data, size, NULL, 0);
+ if (t->mode != MRP_TRANSPORT_MODE_JSON)
+ error = t->recv_data(mt, data, size, NULL, 0);
+ else {
+ mrp_json_t *msg = mrp_json_string_to_object(data, size);
+
+ if (msg != NULL) {
+ error = t->recv_data((mrp_transport_t *)t, msg, 0, NULL, 0);
+ mrp_json_unref(msg);
+ }
+ }
if (error)
goto fatal_error;
}
+static int strm_sendjson(mrp_transport_t *mt, mrp_json_t *msg)
+{
+ strm_t *t = (strm_t *)mt;
+ struct iovec iov[2];
+ const char *s;
+ ssize_t size, n;
+ uint32_t len;
+
+ if (t->connected && (s = mrp_json_object_to_string(msg)) != NULL) {
+ size = strlen(s);
+ len = htobe32(size);
+ iov[0].iov_base = &len;
+ iov[0].iov_len = sizeof(len);
+ iov[1].iov_base = (void *)s;
+ iov[1].iov_len = size;
+
+ n = writev(t->sock, iov, 2);
+
+ if (n == (ssize_t)(size + sizeof(len)))
+ return TRUE;
+ else {
+ if (n == -1 && errno == EAGAIN) {
+ mrp_log_error("%s(): XXX TODO: this sucks, need to add "
+ "output queuing for strm-transport.",
+ __FUNCTION__);
+ }
+ }
+ }
+
+ return FALSE;
+}
+
+
MRP_REGISTER_TRANSPORT(tcp4, TCP4, strm_t, strm_resolve,
strm_open, strm_createfrom, strm_close, NULL,
strm_bind, strm_listen, strm_accept,
strm_sendraw, NULL,
strm_senddata, NULL,
NULL, NULL,
- strm_sendnative, NULL);
+ strm_sendnative, NULL,
+ strm_sendjson, NULL);
MRP_REGISTER_TRANSPORT(tcp6, TCP6, strm_t, strm_resolve,
strm_open, strm_createfrom, strm_close, NULL,
strm_sendraw, NULL,
strm_senddata, NULL,
NULL, NULL,
- strm_sendnative, NULL);
+ strm_sendnative, NULL,
+ strm_sendjson, NULL);
MRP_REGISTER_TRANSPORT(unxstrm, UNXS, strm_t, strm_resolve,
strm_open, strm_createfrom, strm_close, NULL,
strm_sendraw, NULL,
strm_senddata, NULL,
NULL, NULL,
- strm_sendnative, NULL);
+ strm_sendnative, NULL,
+ strm_sendjson, NULL);
}
+int mrp_transport_sendjson(mrp_transport_t *t, mrp_json_t *msg)
+{
+ int result;
+
+ if (t->mode == MRP_TRANSPORT_MODE_JSON && t->descr->req.sendjson) {
+ MRP_TRANSPORT_BUSY(t, {
+ result = t->descr->req.sendjson(t, msg);
+ });
+
+ purge_destroyed(t);
+ }
+ else
+ result = FALSE;
+
+ return result;
+}
+
+
+int mrp_transport_sendjsonto(mrp_transport_t *t, mrp_json_t *msg,
+ mrp_sockaddr_t *addr, socklen_t addrlen)
+{
+ int result;
+
+ if (t->mode == MRP_TRANSPORT_MODE_JSON && t->descr->req.sendjsonto) {
+ MRP_TRANSPORT_BUSY(t, {
+ result = t->descr->req.sendjsonto(t, msg, addr, addrlen);
+ });
+
+ purge_destroyed(t);
+ }
+ else
+ result = FALSE;
+
+ return result;
+}
+
+
static int recv_data(mrp_transport_t *t, void *data, size_t size,
mrp_sockaddr_t *addr, socklen_t addrlen)
{
}
return 0;
+ case MRP_TRANSPORT_MODE_JSON:
+ if (t->connected) {
+ if (t->evt.recvjson) {
+ MRP_TRANSPORT_BUSY(t, {
+ t->evt.recvjson(t, data, t->user_data);
+ });
+ }
+ }
+ else {
+ if (t->evt.recvjsonfrom) {
+ MRP_TRANSPORT_BUSY(t, {
+ t->evt.recvjsonfrom(t, data, addr, addrlen,
+ t->user_data);
+ });
+ }
+ }
+ return 0;
+
default:
return -EPROTOTYPE;
}
#include <murphy/common/mainloop.h>
#include <murphy/common/msg.h>
#include <murphy/common/native-types.h>
+#include <murphy/common/json.h>
MRP_CDECL_BEGIN
MRP_TRANSPORT_MODE_DATA = 0x02, /* uses registered data types */
MRP_TRANSPORT_MODE_CUSTOM = 0x03, /* custom message encoding */
MRP_TRANSPORT_MODE_NATIVE = 0x04, /* uses registered native-types */
+ MRP_TRANSPORT_MODE_JSON = 0x05, /* uses JSON messages */
} mrp_transport_mode_t;
typedef enum {
int (*senddata)(mrp_transport_t *t, void *data, uint16_t tag);
/** Send data with a custom encoder over a transport. */
int (*sendcustom)(mrp_transport_t *t, void *data);
- /* Send a native type over a (connected) transport. */
+ /** Send a native type over a (connected) transport. */
int (*sendnative)(mrp_transport_t *t, void *data, uint32_t type_id);
+ /** Send a JSON message over a (connected) transport. */
+ int (*sendjson)(mrp_transport_t *t, mrp_json_t *msg);
/** Send a message over a(n unconnected) transport. */
int (*sendmsgto)(mrp_transport_t *t, mrp_msg_t *msg, mrp_sockaddr_t *addr,
/** Send a native type over a transport. */
int (*sendnativeto)(mrp_transport_t *t, void *data, uint32_t type_id,
mrp_sockaddr_t *addr, socklen_t addrlen);
+ /** Send a JSON messgae over a(n unconnected) transport. */
+ int (*sendjsonto)(mrp_transport_t *t, mrp_json_t *msg, mrp_sockaddr_t *addr,
+ socklen_t addrlen);
} mrp_transport_req_t;
/** Native type callback for connected transports. */
void (*recvnative)(mrp_transport_t *t, void *data, uint32_t type_id,
void *user_data);
+ /** JSON type callback for connected transports. */
+ void (*recvjson)(mrp_transport_t *t, mrp_json_t *msg, void *user_data);
};
/** Message received on an unconnected transport. */
void (*recvnativefrom)(mrp_transport_t *t, void *data, uint32_t type_id,
mrp_sockaddr_t *addr, socklen_t addrlen,
void *user_data);
+ /** JSON type callback for unconnected transports. */
+ void (*recvjsonfrom)(mrp_transport_t *t, mrp_json_t *msg,
+ mrp_sockaddr_t *addr, socklen_t addrlen,
+ void *user_data);
};
/** Connection closed by peer. */
void (*closed)(mrp_transport_t *t, int error, void *user_data);
_sendraw, _sendrawto, \
_senddata, _senddatato, \
_sendcustom, _sendcustomto, \
- _sendnative, _sendnativeto) \
+ _sendnative, _sendnativeto, \
+ _sendjson, _sendjsonto) \
static void _prfx##_register_transport(void) \
__attribute__((constructor)); \
\
.sendcustomto = _sendcustomto, \
.sendnative = _sendnative, \
.sendnativeto = _sendnativeto, \
+ .sendjson = _sendjson, \
+ .sendjsonto = _sendjsonto, \
}, \
}; \
\
int mrp_transport_sendnativeto(mrp_transport_t *t, void *data, uint32_t type_id,
mrp_sockaddr_t *addr, socklen_t addrlen);
+/** Send a JSON message through the given (connected) transport. */
+int mrp_transport_sendjson(mrp_transport_t *t, mrp_json_t *msg);
+
+/** Send a JSON message through the given transport to the remote address. */
+int mrp_transport_sendjsonto(mrp_transport_t *t, mrp_json_t *msg,
+ mrp_sockaddr_t *addr, socklen_t addrlen);
MRP_CDECL_END
#endif /* __MURPHY_TRANSPORT_H__ */
wsck_sendraw, NULL,
wsck_senddata, NULL,
wsck_sendcustom, NULL,
+ NULL, NULL,
NULL, NULL);