Added a mechanism for exporting and invoking 'domain' functions.
Domain controllers can register functions which then can be invoked
by the core. The core uses the domain-control plugin to proxy the
call to and the return from the invocation. These functions currently
cannot be queried by the core. Trying to invoke a non-existing function
will fail asynchronously. Trying to invoke a function of a non-existing
domain controller will fail synchronously.
Similarly, the core can export functions which then can be invoked
by domain controllers. The domain-control plugin proxies the call to
and the return from the invocation. These functions currently cannot
be queried by the domain-controllers. Trying to invoke a non-existing
function will fail asynchronously.
We shouldn't need to have a domain-control specific export mechanism
at all. We need to replace all the various import/export/invocation
mechanism (there is one at least in the plugin infra, in the resolver
and now this) with a single one at its core and only adapt it as
necessary for these various components. Eventually...
core/scripting.h \
core/method.h \
core/event.h \
- core/auth.h
+ core/auth.h \
+ core/domain.h \
+ core/domain-types.h
libmurphy_core_la_REGULAR_SOURCES = \
core/context.c \
core/event.c \
core/auth.c \
core/auth-deny.c \
+ core/domain.c \
$(LUA_BINDINGS_SOURCES)
if SMACK_ENABLED
#include <murphy/common/mm.h>
#include <murphy/core/context.h>
#include <murphy/core/console-priv.h>
+#include <murphy/core/domain.h>
mrp_context_t *mrp_context_create(void)
{
if ((c = mrp_allocz(sizeof(*c))) != NULL) {
mrp_list_init(&c->plugins);
console_setup(c);
+ domain_setup(c);
mrp_list_init(&c->auth);
+
if ((c->ml = mrp_mainloop_create()) == NULL) {
mrp_log_error("Failed to create mainloop.");
mrp_free(c);
mrp_resolver_t *r; /* resolver context */
void *lua_state; /* state for Lua bindings */
mrp_list_hook_t auth; /* authenticator backends */
+
+ /*
+ * Hmm, this is not very nice. Most of the domain handling code (in
+ * practice all) used to live in the domain-control plugin. To avoid
+ * loading order dependencies on plugin-domain-control we now started
+ * collecting registered handlers of proxied functions here. Calls by
+ * the core to proxied functions of domain controllers and by domain-
+ * controllers to the core are still handled in the domain-control
+ * plugin (and in the domain-controller client library).
+ *
+ * It would be perhaps the cleanest not to have a domain-controller
+ * specific function export mechanism at all. Instead the various
+ * import/export mechanisms (at least plugins, resolver, and this) should
+ * be replaced by / built on a single core implementation that is flexible
+ * enough to handle all the needs of all these.
+ */
+
+ mrp_list_hook_t domain_methods; /* functions for domain controllers */
+ void *domain_invoke; /* domain invoke handler */
+ void *domain_data; /* domain invoke handler data */
};
/** Create a new murphy context. */
--- /dev/null
+/*
+ * Copyright (c) 2012-2014, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Intel Corporation nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __MURPHY_CORE_DOMAIN_TYPES_H__
+#define __MURPHY_CORE_DOMAIN_TYPES_H__
+
+#include <murphy/common/macros.h>
+#include <murphy/common/msg.h>
+
+MRP_CDECL_BEGIN
+
+
+/*
+ * passable data types to/from domain controllers
+ */
+typedef enum {
+ MRP_DOMCTL_END = MRP_MSG_FIELD_INVALID,
+ MRP_DOMCTL_STRING = MRP_MSG_FIELD_STRING,
+ MRP_DOMCTL_INTEGER = MRP_MSG_FIELD_INT32,
+ MRP_DOMCTL_UNSIGNED = MRP_MSG_FIELD_UINT32,
+ MRP_DOMCTL_DOUBLE = MRP_MSG_FIELD_DOUBLE,
+ MRP_DOMCTL_BOOL = MRP_MSG_FIELD_BOOL,
+ MRP_DOMCTL_UINT8 = MRP_MSG_FIELD_UINT8,
+ MRP_DOMCTL_INT8 = MRP_MSG_FIELD_INT8,
+ MRP_DOMCTL_UINT16 = MRP_MSG_FIELD_UINT16,
+ MRP_DOMCTL_INT16 = MRP_MSG_FIELD_INT16,
+ MRP_DOMCTL_UINT32 = MRP_MSG_FIELD_UINT32,
+ MRP_DOMCTL_INT32 = MRP_MSG_FIELD_INT32,
+ MRP_DOMCTL_UINT64 = MRP_MSG_FIELD_UINT64,
+ MRP_DOMCTL_INT64 = MRP_MSG_FIELD_INT64,
+
+#define MRP_DOMCTL_ARRAY(_type) MRP_MSG_FIELD_ARRAY_OF(_type)
+#define MRP_DOMCTL_IS_ARRAY(_type) MRP_MSG_FIELD_IS_ARRAY(_type)
+#define MRP_DOMCTL_ARRAY_TYPE(_type) MRP_MSG_FIELD_ARRAY_TYPE(_type)
+} mrp_domctl_type_t;
+
+
+/*
+ * a single data value passed to/from a domain controller
+ */
+
+typedef struct {
+ mrp_domctl_type_t type; /* data type */
+ union {
+ /* these are usable both in DB operations and proxied invocations */
+ const char *str; /* MRP_DOMCTL_STRING */
+ uint32_t u32; /* MRP_DOMCTL_{UNSIGNED,UINT32} */
+ int32_t s32; /* MRP_DOMCTL_{INTEGER,INT32} */
+ double dbl; /* MRP_DOMCTL_DOUBLE */
+ /* these are only usable in proxied invocations */
+ int bln; /* MRP_DOMCTL_BOOL */
+ uint8_t u8; /* MRP_DOMCTL_UINT8 */
+ int8_t s8; /* MRP_DOMCTL_INT8 */
+ uint16_t u16; /* MRP_DOMCTL_UINT16 */
+ int16_t s16; /* MRP_DOMCTL_INT16 */
+ uint64_t u64; /* MRP_DOMCTL_UINT64 */
+ int64_t s64; /* MRP_DOMCTL_INT64 */
+ void *arr; /* MRP_DOMCTL_ARRAY(*) */
+ };
+ uint32_t size; /* size for arrays */
+} mrp_domctl_value_t;
+
+
+/*
+ * proxied invokation errors
+ */
+
+typedef enum {
+ MRP_DOMAIN_OK = 0, /* no errors */
+ MRP_DOMAIN_NOTFOUND, /* domain not found */
+ MRP_DOMAIN_NOMETHOD, /* call domain method not found */
+ MRP_DOMAIN_FAILED, /* called method remotely failed */
+} mrp_domain_error_t;
+
+/* Type for a proxied invocation argument. */
+typedef mrp_domctl_value_t mrp_domctl_arg_t;
+
+MRP_CDECL_END
+
+#endif /* __MURPHY_CORE_DOMAIN_TYPES_H__ */
--- /dev/null
+/*
+ * Copyright (c) 2012-2014, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Intel Corporation nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <murphy/common/macros.h>
+#include <murphy/common/debug.h>
+#include <murphy/common/log.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/list.h>
+
+#include <murphy/core/context.h>
+#include <murphy/core/domain.h>
+
+typedef struct {
+ mrp_list_hook_t hook; /* to list of registered methods */
+ char *name; /* method name */
+ int max_out; /* max returned arguments */
+ mrp_domain_invoke_cb_t cb; /* actual callback */
+ void *user_data; /* callback user data */
+} method_t;
+
+
+void domain_setup(mrp_context_t *ctx)
+{
+ mrp_list_init(&ctx->domain_methods);
+}
+
+
+int mrp_set_domain_invoke_handler(mrp_context_t *ctx,
+ mrp_domain_invoke_handler_t handler,
+ void *handler_data)
+{
+ if (ctx->domain_invoke != NULL)
+ return FALSE;
+
+ ctx->domain_invoke = handler;
+ ctx->domain_data = handler_data;
+
+ return TRUE;
+}
+
+
+int mrp_register_domain_methods(mrp_context_t *ctx,
+ mrp_domain_method_def_t *defs, size_t ndef)
+{
+ mrp_domain_method_def_t *def;
+ method_t *m;
+ size_t i;
+
+ for (i = 0, def = defs; i < ndef; i++, def++) {
+ m = mrp_allocz(sizeof(*m));
+
+ if (m == NULL)
+ return FALSE;
+
+ mrp_list_init(&m->hook);
+
+ m->name = mrp_strdup(def->name);
+ m->max_out = def->max_out;
+ m->cb = def->cb;
+ m->user_data = def->user_data;
+
+ if (m->name == NULL) {
+ mrp_free(m);
+ return FALSE;
+ }
+
+ mrp_list_append(&ctx->domain_methods, &m->hook);
+ }
+
+ return TRUE;
+}
+
+
+static method_t *find_method(mrp_context_t *ctx, const char *name)
+{
+ mrp_list_hook_t *p, *n;
+ method_t *m;
+
+ mrp_list_foreach(&ctx->domain_methods, p, n) {
+ m = mrp_list_entry(p, typeof(*m), hook);
+
+ if (!strcmp(m->name, name))
+ return m;
+ }
+
+ return NULL;
+}
+
+
+int mrp_lookup_domain_method(mrp_context_t *ctx, const char *name,
+ mrp_domain_invoke_cb_t *cb, int *max_out,
+ void **user_data)
+{
+ method_t *m;
+
+ m = find_method(ctx, name);
+
+ if (m == NULL)
+ return FALSE;
+
+ *cb = m->cb;
+ *max_out = m->max_out;
+ *user_data = m->user_data;
+
+ return TRUE;
+}
+
+
+int mrp_invoke_domain(mrp_context_t *ctx, const char *domain,
+ const char *method, int narg, mrp_domctl_arg_t *args,
+ mrp_domain_return_cb_t return_cb, void *user_data)
+{
+ mrp_domain_invoke_handler_t handler = ctx->domain_invoke;
+ void *handler_data = ctx->domain_data;
+
+ if (handler == NULL)
+ return FALSE;
+
+ return handler(handler_data, domain,
+ method, narg, args, return_cb, user_data);
+}
--- /dev/null
+/*
+ * Copyright (c) 2012-2014, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Intel Corporation nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __MURPHY_CORE_DOMAIN__
+#define __MURPHY_CORE_DOMAIN__
+
+#include <murphy/common/macros.h>
+#include <murphy/common/msg.h>
+
+#include <murphy/core/context.h>
+#include <murphy/core/domain-types.h>
+
+MRP_CDECL_BEGIN
+
+/* Type for a proxied invocation handler. */
+typedef int (*mrp_domain_invoke_cb_t)(int narg, mrp_domctl_arg_t *args,
+ uint32_t *nout, mrp_domctl_arg_t *outs,
+ void *user_data);
+
+/* Type for a proxied invocation return/reply handler. */
+typedef void (*mrp_domain_return_cb_t)(int error, int retval, int narg,
+ mrp_domctl_arg_t *args, void *user_data);
+
+typedef struct {
+ char *name; /* method name */
+ int max_out; /* max. number of return arguments */
+ mrp_domain_invoke_cb_t cb; /* handler callback */
+ void *user_data; /* opaque callback user data */
+} mrp_domain_method_def_t;
+
+
+
+/* Type for handling proxied invocation to domain controllers. */
+typedef int (*mrp_domain_invoke_handler_t)(void *handler_data, const char *id,
+ const char *method, int narg,
+ mrp_domctl_arg_t *args,
+ mrp_domain_return_cb_t return_cb,
+ void *user_data);
+
+/* Initialize domain-specific context parts. */
+void domain_setup(mrp_context_t *ctx);
+
+/* Register a domain method. */
+int mrp_register_domain_methods(mrp_context_t *ctx,
+ mrp_domain_method_def_t *defs, size_t ndef);
+
+/* Find a registered domain method. */
+int mrp_lookup_domain_method(mrp_context_t *ctx, const char *method,
+ mrp_domain_invoke_cb_t *cb, int *max_out,
+ void **user_data);
+
+/* Invoke the named method of the specified domain. */
+int mrp_invoke_domain(mrp_context_t *ctx, const char *domain, const char *method,
+ int narg, mrp_domctl_arg_t *args,
+ mrp_domain_return_cb_t return_cb, void *user_data);
+
+/* Set the domain invoke handler. */
+int mrp_set_domain_invoke_handler(mrp_context_t *ctx,
+ mrp_domain_invoke_handler_t handler,
+ void *handler_data);
+
+MRP_CDECL_END
+
+#endif /* __MURPHY_CORE_DOMAIN__ */
typedef struct {
mrp_list_hook_t hook; /* hook to pending request queue */
uint32_t seqno; /* sequence number/request id */
- mrp_domctl_status_cb_t cb; /* callback to call upon completion */
+ int invoke : 1; /* whether a pending invocation */
+ union {
+ mrp_domctl_status_cb_t status; /* request completion callback */
+ mrp_domctl_return_cb_t ret; /* invocation return cb */
+ } cb;
void *user_data; /* opaque callback data */
} pending_request_t;
+/*
+ * a registered proxied method
+ */
+
+typedef struct {
+ mrp_list_hook_t hook; /* to list of methods */
+ char *name; /* method name */
+ size_t max_out; /* max return arguments */
+ mrp_domctl_invoke_cb_t cb; /* handler callback */
+ void *user_data; /* opaque handler data */
+} method_t;
+
static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
mrp_sockaddr_t *addr, socklen_t addrlen,
static int queue_pending(mrp_domctl_t *dc, uint32_t seq,
mrp_domctl_status_cb_t cb, void *user_data);
-static int notify_pending(mrp_domctl_t *dc, uint32_t seq, int error,
- const char *msg);
+static int notify_pending(mrp_domctl_t *dc, msg_t *msg);
+static int queue_invoke(mrp_domctl_t *dc, uint32_t seq,
+ mrp_domctl_return_cb_t cb, void *user_data);
+static int notify_invoke(mrp_domctl_t *dc, uint32_t seq, int error,
+ int status, int narg, mrp_domctl_arg_t *args);
static void purge_pending(mrp_domctl_t *dc);
dc->user_data = user_data;
dc->seqno = 1;
+ mrp_list_init(&dc->methods);
+
return dc;
}
}
+int mrp_domctl_invoke(mrp_domctl_t *dc, const char *name, int narg,
+ mrp_domctl_arg_t *args, mrp_domctl_return_cb_t reply_cb,
+ void *user_data)
+{
+ invoke_msg_t invoke;
+ mrp_msg_t *msg;
+ uint32_t seq = dc->seqno++;
+ int success;
+
+ if (!dc->connected)
+ return FALSE;
+
+ if (reply_cb == NULL && user_data != NULL)
+ return FALSE;
+
+ mrp_clear(&invoke);
+ invoke.type = MSG_TYPE_INVOKE;
+ invoke.seq = seq;
+ invoke.name = name;
+ invoke.noret = reply_cb ? TRUE : FALSE;
+ invoke.narg = narg;
+ invoke.args = args;
+
+ msg = msg_encode_message((msg_t *)&invoke);
+
+ if (msg != NULL) {
+ success = mrp_transport_send(dc->t, msg);
+ mrp_msg_unref(msg);
+
+ if (success)
+ queue_invoke(dc, seq, reply_cb, user_data);
+
+ return success;
+ }
+ else
+ return FALSE;
+}
+
+
+int mrp_domctl_register_methods(mrp_domctl_t *dc, mrp_domctl_method_def_t *defs,
+ size_t ndef)
+{
+ mrp_domctl_method_def_t *def;
+ method_t *m;
+ size_t i;
+
+ for (i = 0, def = defs; i < ndef; i++, def++) {
+ m = mrp_allocz(sizeof(*m));
+
+ if (m == NULL)
+ return FALSE;
+
+ mrp_list_init(&m->hook);
+
+ m->name = mrp_strdup(def->name);
+ m->max_out = def->max_out;
+ m->cb = def->cb;
+ m->user_data = def->user_data;
+
+ if (m->name == NULL) {
+ mrp_free(m);
+ return FALSE;
+ }
+
+ mrp_list_append(&dc->methods, &m->hook);
+ }
+
+ return TRUE;
+}
+
+
+static method_t *find_method(mrp_domctl_t *dc, const char *name)
+{
+ mrp_list_hook_t *p, *n;
+ method_t *m;
+
+ mrp_list_foreach(&dc->methods, p, n) {
+ m = mrp_list_entry(p, typeof(*m), hook);
+
+ if (!strcmp(m->name, name))
+ return m;
+ }
+
+ return NULL;
+}
+
+
static void process_ack(mrp_domctl_t *dc, ack_msg_t *ack)
{
if (ack->seq != 0)
- notify_pending(dc, ack->seq, 0, NULL);
+ notify_pending(dc, (msg_t *)ack);
else
notify_connect(dc);
}
static void process_nak(mrp_domctl_t *dc, nak_msg_t *nak)
{
if (nak->seq != 0)
- notify_pending(dc, nak->seq, nak->error, nak->msg);
+ notify_pending(dc, (msg_t *)nak);
else
notify_disconnect(dc, nak->error, nak->msg);
}
}
+static void process_invoke(mrp_domctl_t *dc, invoke_msg_t *invoke)
+{
+ method_t *m;
+ mrp_domctl_arg_t *args, error;
+ int narg;
+ return_msg_t ret;
+ mrp_msg_t *msg;
+ int i;
+
+ mrp_clear(&ret);
+
+ m = find_method(dc, invoke->name);
+
+ ret.type = MSG_TYPE_RETURN;
+ ret.seq = invoke->seq;
+
+ if (m == NULL) {
+ ret.error = MRP_DOMCTL_NOTFOUND;
+ }
+ else {
+ ret.error = MRP_DOMCTL_OK;
+
+ narg = ret.narg = m->max_out;
+
+ if (narg > 0) {
+ args = ret.args = alloca(narg * sizeof(args[0]));
+ memset(args, 0, narg * sizeof(args[0]));
+ }
+
+ ret.retval = m->cb(dc, invoke->narg, invoke->args,
+ &ret.narg, ret.args, m->user_data);
+ }
+
+ msg = msg_encode_message((msg_t *)&ret);
+
+ if (msg == NULL) {
+ error.type = MRP_DOMCTL_STRING;
+ error.str = "failed to encode return message (arguments)";
+ ret.error = MRP_DOMAIN_FAILED;
+ ret.narg = 1;
+ ret.args = &error;
+
+ msg = msg_encode_message((msg_t *)&ret);
+
+ ret.narg = 0;
+ ret.args = NULL;
+ }
+
+ if (msg != NULL) {
+ mrp_transport_send(dc->t, msg);
+ mrp_msg_unref(msg);
+ }
+
+ narg = ret.narg;
+ for (i = 0; i < narg; i++) {
+ if (args[i].type == MRP_DOMCTL_STRING)
+ mrp_free((char *)args[i].str);
+ else if (MRP_DOMCTL_IS_ARRAY(args[i].type)) {
+ uint32_t j;
+
+ for (j = 0; j < args[i].size; j++)
+ if (MRP_DOMCTL_ARRAY_TYPE(args[i].type) == MRP_DOMCTL_STRING)
+ mrp_free(((char **)args[i].arr)[j]);
+
+ mrp_free(args[i].arr);
+ }
+ }
+}
+
+
+static void process_return(mrp_domctl_t *dc, return_msg_t *ret)
+{
+ notify_pending(dc, (msg_t *)ret);
+}
+
+
static void recv_cb(mrp_transport_t *t, mrp_msg_t *tmsg, void *user_data)
{
mrp_domctl_t *dc = (mrp_domctl_t *)user_data;
case MSG_TYPE_NAK:
process_nak(dc, &msg->nak);
break;
+ case MSG_TYPE_INVOKE:
+ process_invoke(dc, &msg->invoke);
+ break;
+ case MSG_TYPE_RETURN:
+ process_return(dc, &msg->ret);
+ break;
default:
mrp_domctl_disconnect(dc);
notify_disconnect(dc, EINVAL, "unexpected message from server");
if (pending != NULL) {
mrp_list_init(&pending->hook);
+ pending->invoke = false;
pending->seqno = seq;
- pending->cb = cb;
+ pending->cb.status = cb;
pending->user_data = user_data;
mrp_list_append(&dc->pending, &pending->hook);
}
-static int notify_pending(mrp_domctl_t *dc, uint32_t seq, int error,
- const char *msg)
+static int notify_pending(mrp_domctl_t *dc, msg_t *msg)
{
mrp_list_hook_t *p, *n;
pending_request_t *pending;
+ uint32_t seq;
+ int error, status;
+ const char *message;
+ int narg;
+ mrp_domctl_arg_t *args;
+ int success;
+
+ seq = msg->any.seq;
mrp_list_foreach(&dc->pending, p, n) {
pending = mrp_list_entry(p, typeof(*pending), hook);
- if (pending->seqno == seq) {
- DOMCTL_MARK_BUSY(dc, {
- pending->cb(dc, error, msg, pending->user_data);
- mrp_list_delete(&pending->hook);
- mrp_free(pending);
- });
-
- return TRUE;
+ if (pending->seqno != seq)
+ continue;
+
+ if (!pending->invoke) {
+ switch (msg->any.type) {
+ case MSG_TYPE_ACK:
+ error = 0;
+ message = NULL;
+ goto notify;
+ case MSG_TYPE_NAK:
+ error = msg->nak.error;
+ message = msg->nak.msg;
+ notify:
+ DOMCTL_MARK_BUSY(dc, {
+ pending->cb.status(dc, error, message,
+ pending->user_data);
+ });
+ success = TRUE;
+ break;
+ default:
+ success = FALSE;
+ break;
+ }
+ }
+ else {
+ if (msg->any.type == MSG_TYPE_RETURN) {
+ error = msg->ret.error;
+ status = msg->ret.retval;
+ narg = msg->ret.narg;
+ args = msg->ret.args;
+
+ DOMCTL_MARK_BUSY(dc, {
+ pending->cb.ret(dc, error, status, narg, args,
+ pending->user_data);
+ });
+ success = TRUE;
+ }
+ else
+ success = FALSE;
}
+
+ mrp_list_delete(&pending->hook);
+ mrp_free(pending);
+
+ return success;
}
return FALSE;
}
+static int queue_invoke(mrp_domctl_t *dc, uint32_t seq,
+ mrp_domctl_return_cb_t cb, void *user_data)
+{
+ pending_request_t *pending;
+
+ if (cb == NULL)
+ return TRUE;
+
+ pending = mrp_allocz(sizeof(*pending));
+
+ if (pending != NULL) {
+ mrp_list_init(&pending->hook);
+
+ pending->invoke = true;
+ pending->seqno = seq;
+ pending->cb.ret = cb;
+ pending->user_data = user_data;
+
+ mrp_list_append(&dc->pending, &pending->hook);
+
+ return TRUE;
+ }
+ else
+ return FALSE;
+}
+
+
static void purge_pending(mrp_domctl_t *dc)
{
mrp_list_hook_t *p, *n;
#include <murphy/common/macros.h>
#include <murphy/common/mainloop.h>
#include <murphy-db/mqi.h>
+#include <murphy/core/domain-types.h>
MRP_CDECL_BEGIN
/*
- * table column types and values
- */
-
-typedef enum {
- MRP_DOMCTL_STRING = mqi_varchar,
- MRP_DOMCTL_INTEGER = mqi_integer,
- MRP_DOMCTL_UNSIGNED = mqi_unsignd,
- MRP_DOMCTL_DOUBLE = mqi_floating
-} mrp_domctl_type_t;
-
-typedef struct {
- mrp_domctl_type_t type; /* data type */
- union {
- const char *str; /* MRP_DOMCTL_STRING */
- uint32_t u32; /* MRP_DOMCTL_UNSIGNED */
- int32_t s32; /* MRP_DOMCTL_INTEGER */
- double dbl; /* MRP_DOMCTL_DOUBLE */
- };
-} mrp_domctl_value_t;
-
-
-/*
* table data
*/
} mrp_domctl_data_t;
-
/** Opaque policy domain controller type. */
typedef struct mrp_domctl_s mrp_domctl_t;
mrp_domctl_data_t *tables, int ntable,
void *user_data);
+/** Callback type for return of/reply to a proxied method invocation. */
+typedef void (*mrp_domctl_return_cb_t)(mrp_domctl_t *dc, int error, int retval,
+ int narg, mrp_domctl_arg_t *args,
+ void *user_data);
+
+/** Callback type for a proxied method invocation. */
+typedef int (*mrp_domctl_invoke_cb_t)(mrp_domctl_t *dc, int narg,
+ mrp_domctl_arg_t *args,
+ int *nout, mrp_domctl_arg_t *outs,
+ void *user_data);
+
+/*
+ * proxied invocation errors
+ */
+
+typedef enum {
+ MRP_DOMCTL_OK = MRP_DOMAIN_OK,
+ MRP_DOMCTL_NOTFOUND = MRP_DOMAIN_NOTFOUND,
+ MRP_DOMCTL_NOMETHOD = MRP_DOMAIN_NOMETHOD,
+} mrp_domctl_error_t;
+
+/*
+ * a domain controller method definition
+ */
+
+typedef struct {
+ const char *name;
+ size_t max_out;
+ mrp_domctl_invoke_cb_t cb;
+ void *user_data;
+} mrp_domctl_method_def_t;
+
+
/** Create a new policy domain controller. */
mrp_domctl_t *mrp_domctl_create(const char *name, mrp_mainloop_t *ml,
mrp_domctl_table_t *tables, int ntable,
int mrp_domctl_set_data(mrp_domctl_t *dc, mrp_domctl_data_t *tables, int ntable,
mrp_domctl_status_cb_t status_cb, void *user_data);
+/** Invoke a proxied method. */
+int mrp_domctl_invoke(mrp_domctl_t *dc, const char *method, int narg,
+ mrp_domctl_arg_t *args, mrp_domctl_return_cb_t return_cb,
+ void *user_data);
+
+/** Register a proxied method handler. */
+int mrp_domctl_register_methods(mrp_domctl_t *dc, mrp_domctl_method_def_t *defs,
+ size_t ndef);
+
MRP_CDECL_END
#endif /* __MURPHY_DOMAIN_CONTROL_CLIENT_H__ */
int destroyed:1;/* non-zero if destroy pending */
uint32_t seqno; /* request sequence number */
mrp_list_hook_t pending; /* queue of outstanding requests */
+ mrp_list_hook_t methods; /* registered proxied methods */
};
int ntable; /* number of tables */
mrp_list_hook_t watches; /* tables watched by this */
proxy_ops_t *ops; /* transport/messaging operations */
+ uint32_t seqno; /* request sequence number */
+ mrp_list_hook_t pending; /* pending method invocations */
int notify_update; /* whether needs notification */
void *notify_msg; /* notification being built */
int notify_ntable; /* number of changed tables */
#include <murphy/common/log.h>
#include <murphy/common/wsck-transport.h>
+#include <murphy/core/domain.h>
+
#include "proxy.h"
#include "message.h"
#include "table.h"
static mrp_transport_t *create_transport(pdp_t *pdp, const char *address);
static void destroy_transport(mrp_transport_t *t);
+static int invoke_handler(void *handler_data, const char *id,
+ const char *method, int narg,
+ mrp_domctl_arg_t *args,
+ mrp_domain_return_cb_t return_cb,
+ void *user_data);
+
+
pdp_t *create_domain_control(mrp_context_t *ctx,
const char *extaddr, const char *intaddr,
const char *wrtaddr, const char *httpdir)
if ((!extaddr || !*extaddr || pdp->extt != NULL) &&
(!intaddr || !*intaddr || pdp->intt != NULL) &&
- (!wrtaddr || !*wrtaddr || pdp->wrtt != NULL))
+ (!wrtaddr || !*wrtaddr || pdp->wrtt != NULL)) {
+ mrp_set_domain_invoke_handler(ctx, invoke_handler, pdp);
return pdp;
+ }
}
destroy_domain_control(pdp);
}
+static void process_invoke(pep_proxy_t *proxy, invoke_msg_t *invoke)
+{
+ mrp_context_t *ctx = proxy->pdp->ctx;
+ mrp_domain_invoke_cb_t cb;
+ void *user_data;
+ int max_out;
+ mrp_domctl_arg_t *args;
+ int narg;
+ return_msg_t ret;
+ mrp_msg_t *msg;
+ int i;
+
+ mrp_clear(&ret);
+
+ ret.type = MSG_TYPE_RETURN;
+ ret.seq = invoke->seq;
+ args = NULL;
+ narg = 0;
+
+ if (!mrp_lookup_domain_method(ctx, invoke->name, &cb, &max_out,&user_data)) {
+ ret.error = MRP_DOMCTL_NOTFOUND;
+ }
+ else {
+ ret.error = MRP_DOMCTL_OK;
+
+ narg = ret.narg = max_out;
+
+ if (narg > 0) {
+ args = ret.args = alloca(narg * sizeof(args[0]));
+ memset(args, 0, narg * sizeof(args[0]));
+ }
+
+ ret.retval = cb(invoke->narg, invoke->args,
+ &ret.narg, ret.args, user_data);
+ }
+
+ msg = msg_encode_message((msg_t *)&ret);
+
+ if (msg != NULL) {
+ mrp_transport_send(proxy->t, msg);
+ mrp_msg_unref(msg);
+ }
+
+ narg = ret.narg;
+ for (i = 0; i < narg; i++) {
+ if (args[i].type == MRP_DOMCTL_STRING)
+ mrp_free((char *)args[i].str);
+ else if (MRP_DOMCTL_IS_ARRAY(args[i].type)) {
+ uint32_t j;
+
+ for (j = 0; j < args[i].size; j++)
+ if (MRP_DOMCTL_ARRAY_TYPE(args[i].type) == MRP_DOMCTL_STRING)
+ mrp_free(((char **)args[i].arr)[j]);
+
+ mrp_free(args[i].arr);
+ }
+ }
+}
+
+
+static void process_return(pep_proxy_t *proxy, return_msg_t *ret)
+{
+ uint32_t id = ret->seq;
+ mrp_domain_return_cb_t cb;
+ void *user_data;
+
+ if (!proxy_dequeue_pending(proxy, id, &cb, &user_data))
+ return;
+
+ cb(ret->error, ret->retval, ret->narg, ret->args, user_data);
+}
+
+
static void process_message(pep_proxy_t *proxy, msg_t *msg)
{
char *name = proxy->name ? proxy->name : "<unknown>";
case MSG_TYPE_SET:
process_set(proxy, &msg->set);
break;
+ case MSG_TYPE_INVOKE:
+ process_invoke(proxy, &msg->invoke);
+ break;
+ case MSG_TYPE_RETURN:
+ process_return(proxy, &msg->ret);
+ break;
default:
mrp_log_error("Unexpected message 0x%x from client %s.",
msg->any.type, name);
}
+static int invoke_handler(void *handler_data, const char *domain,
+ const char *method, int narg,
+ mrp_domctl_arg_t *args,
+ mrp_domain_return_cb_t return_cb,
+ void *user_data)
+{
+ pdp_t *pdp = (pdp_t *)handler_data;
+ pep_proxy_t *proxy = find_proxy(pdp, domain);
+ uint32_t id;
+ invoke_msg_t invoke;
+
+ if (proxy == NULL)
+ return FALSE;
+
+ id = proxy_queue_pending(proxy, return_cb, user_data);
+
+ if (!id)
+ return FALSE;
+
+ mrp_clear(&invoke);
+
+ invoke.type = MSG_TYPE_INVOKE;
+ invoke.seq = id;
+ invoke.name = method;
+ invoke.noret = (return_cb == NULL);
+ invoke.narg = narg;
+ invoke.args = args;
+
+ return msg_send_message(proxy, (msg_t *)&invoke);
+}
+
+
static int msg_op_send_msg(pep_proxy_t *proxy, msg_t *msg)
{
return msg_send_message(proxy, msg);
}
+void msg_free_invoke(msg_t *msg)
+{
+ if (msg != NULL) {
+ mrp_free(msg->invoke.args);
+ mrp_free(msg);
+ }
+}
+
+
+mrp_msg_t *msg_encode_invoke(invoke_msg_t *invoke)
+{
+ mrp_msg_t *msg;
+ mrp_domctl_arg_t *arg;
+ uint32_t i;
+
+ msg = mrp_msg_create(MSG_UINT16(MSGTYPE, MSG_TYPE_INVOKE),
+ MSG_UINT32(MSGSEQ , invoke->seq),
+ MSG_STRING(METHOD , invoke->name),
+ MSG_BOOL (NORET , invoke->noret),
+ MSG_UINT32(NARG , invoke->narg),
+ MSG_END);
+
+ for (i = 0, arg = invoke->args; i < invoke->narg; i++, arg++) {
+ switch (arg->type) {
+ case MRP_DOMCTL_STRING:
+ if (!mrp_msg_append(msg, MSG_STRING(ARG, arg->str)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_DOUBLE:
+ if (!mrp_msg_append(msg, MSG_DOUBLE(ARG, arg->dbl)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_BOOL:
+ if (!mrp_msg_append(msg, MSG_BOOL(ARG, arg->bln)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_UINT8:
+ if (!mrp_msg_append(msg, MSG_UINT8(ARG, arg->s8)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_INT8:
+ if (!mrp_msg_append(msg, MSG_SINT8(ARG, arg->u8)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_UINT16:
+ if (!mrp_msg_append(msg, MSG_UINT16(ARG, arg->s16)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_INT16:
+ if (!mrp_msg_append(msg, MSG_SINT16(ARG, arg->u16)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_UINT32:
+ if (!mrp_msg_append(msg, MSG_UINT32(ARG, arg->s32)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_INT32:
+ if (!mrp_msg_append(msg, MSG_SINT32(ARG, arg->u32)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_UINT64:
+ if (!mrp_msg_append(msg, MSG_UINT64(ARG, arg->s64)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_INT64:
+ if (!mrp_msg_append(msg, MSG_SINT64(ARG, arg->u64)))
+ goto fail;
+ break;
+ default:
+ if (MRP_DOMCTL_IS_ARRAY(arg->type)) {
+ if (!mrp_msg_append(msg, MSG_ARRAY(ARG, arg->type,
+ arg->size, arg->arr)))
+ goto fail;
+ }
+ else
+ goto fail;
+ break;
+ }
+ }
+
+ return msg;
+
+ fail:
+ mrp_msg_unref(msg);
+ return NULL;
+}
+
+
+msg_t *msg_decode_invoke(mrp_msg_t *msg)
+{
+ invoke_msg_t *invoke;
+ void *it;
+ uint16_t tag, type;
+ mrp_msg_value_t val;
+ mrp_domctl_arg_t *arg;
+ uint32_t i;
+ size_t size;
+
+ mrp_debug_code({
+ mrp_debug("got domain invoke request:");
+ mrp_msg_dump(msg, stdout); });
+
+ it = NULL;
+ invoke = mrp_allocz(sizeof(*invoke));
+
+ if (invoke == NULL)
+ goto fail;
+
+ invoke->type = MSG_TYPE_INVOKE;
+
+ if (!mrp_msg_iterate_get(msg, &it,
+ MSG_UINT32(MSGSEQ, &invoke->seq),
+ MSG_STRING(METHOD, &invoke->name),
+ MSG_BOOL (NORET , &invoke->noret),
+ MSG_UINT32(NARG , &invoke->narg),
+ MSG_END))
+ goto fail;
+
+
+
+ if (invoke->narg > 0)
+ invoke->args = mrp_allocz(invoke->narg * sizeof(invoke->args[0]));
+
+ if (invoke->args == NULL && invoke->narg > 0)
+ goto fail;
+
+ for (i = 0, arg = invoke->args; i < invoke->narg; i++, arg++) {
+ if (!mrp_msg_iterate(msg, &it, &tag, &type, &val, &size))
+ goto fail;
+
+ arg->type = type;
+
+ switch (type) {
+ case MRP_DOMCTL_STRING: arg->str = val.str; break;
+ case MRP_DOMCTL_BOOL: arg->bln = val.bln; break;
+ case MRP_DOMCTL_UINT8: arg->u8 = val.u8; break;
+ case MRP_DOMCTL_INT8: arg->s8 = val.s8; break;
+ case MRP_DOMCTL_UINT16: arg->u16 = val.u16; break;
+ case MRP_DOMCTL_INT16: arg->s16 = val.s16; break;
+ case MRP_DOMCTL_UINT32: arg->u32 = val.u32; break;
+ case MRP_DOMCTL_INT32: arg->s32 = val.s32; break;
+ case MRP_DOMCTL_UINT64: arg->u64 = val.u64; break;
+ case MRP_DOMCTL_INT64: arg->s64 = val.s64; break;
+ case MRP_DOMCTL_DOUBLE: arg->dbl = val.dbl; break;
+ default:
+ if (MRP_DOMCTL_IS_ARRAY(type)) {
+ arg->arr = val.aany;
+ arg->size = size;
+ }
+ else
+ goto fail;
+ }
+ }
+
+ invoke->wire = mrp_msg_ref(msg);
+ invoke->unref_wire = msg_unref_wire;
+
+ return (msg_t *)invoke;
+
+ fail:
+ msg_free_invoke((msg_t *)invoke);
+ return NULL;
+}
+
+
+void msg_free_return(msg_t *msg)
+{
+ if (msg != NULL) {
+ mrp_free(msg->ret.args);
+ mrp_free(msg);
+ }
+}
+
+
+mrp_msg_t *msg_encode_return(return_msg_t *ret)
+{
+ mrp_msg_t *msg;
+ mrp_domctl_arg_t *arg;
+ uint32_t i;
+
+ msg = mrp_msg_create(MSG_UINT16(MSGTYPE, MSG_TYPE_RETURN),
+ MSG_UINT32(MSGSEQ , ret->seq),
+ MSG_UINT32(ERROR , ret->error),
+ MSG_SINT32(RETVAL , ret->retval),
+ MSG_UINT32(NARG , ret->narg),
+ MSG_END);
+
+ for (i = 0, arg = ret->args; i < ret->narg; i++, arg++) {
+ switch (arg->type) {
+ case MRP_DOMCTL_STRING:
+ if (!mrp_msg_append(msg, MSG_STRING(ARG, arg->str)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_DOUBLE:
+ if (!mrp_msg_append(msg, MSG_DOUBLE(ARG, arg->dbl)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_BOOL:
+ if (!mrp_msg_append(msg, MSG_BOOL(ARG, arg->bln)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_UINT8:
+ if (!mrp_msg_append(msg, MSG_UINT8(ARG, arg->s8)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_INT8:
+ if (!mrp_msg_append(msg, MSG_SINT8(ARG, arg->u8)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_UINT16:
+ if (!mrp_msg_append(msg, MSG_UINT16(ARG, arg->s16)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_INT16:
+ if (!mrp_msg_append(msg, MSG_SINT16(ARG, arg->u16)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_UINT32:
+ if (!mrp_msg_append(msg, MSG_UINT32(ARG, arg->s32)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_INT32:
+ if (!mrp_msg_append(msg, MSG_SINT32(ARG, arg->u32)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_UINT64:
+ if (!mrp_msg_append(msg, MSG_UINT64(ARG, arg->s64)))
+ goto fail;
+ break;
+ case MRP_DOMCTL_INT64:
+ if (!mrp_msg_append(msg, MSG_SINT64(ARG, arg->u64)))
+ goto fail;
+ break;
+ default:
+ if (MRP_DOMCTL_IS_ARRAY(arg->type)) {
+ if (!mrp_msg_append(msg, MSG_ARRAY(ARG, arg->type,
+ arg->size, arg->arr)))
+ goto fail;
+ }
+ else
+ goto fail;
+ break;
+ }
+ }
+
+ return msg;
+
+ fail:
+ mrp_msg_unref(msg);
+ return NULL;
+}
+
+
+msg_t *msg_decode_return(mrp_msg_t *msg)
+{
+ return_msg_t *ret;
+ void *it;
+ uint16_t tag, type;
+ mrp_msg_value_t val;
+ mrp_domctl_arg_t *arg;
+ uint32_t i;
+ size_t size;
+
+ mrp_debug_code({
+ mrp_debug("got domain return (invoke reply):");
+ mrp_msg_dump(msg, stdout); });
+
+ it = NULL;
+ ret = mrp_allocz(sizeof(*ret));
+
+ if (ret == NULL)
+ goto fail;
+
+ ret->type = MSG_TYPE_RETURN;
+
+ if (!mrp_msg_iterate_get(msg, &it,
+ MSG_UINT32(MSGSEQ, &ret->seq),
+ MSG_UINT32(ERROR , &ret->error),
+ MSG_SINT32(RETVAL, &ret->retval),
+ MSG_UINT32(NARG , &ret->narg),
+ MSG_END))
+ goto fail;
+
+ if (ret->narg > 0)
+ ret->args = mrp_allocz(ret->narg * sizeof(ret->args[0]));
+
+ if (ret->args == NULL && ret->narg > 0)
+ goto fail;
+
+ for (i = 0, arg = ret->args; i < ret->narg; i++, arg++) {
+ if (!mrp_msg_iterate(msg, &it, &tag, &type, &val, &size))
+ goto fail;
+
+ arg->type = type;
+
+ switch (type) {
+ case MRP_DOMCTL_STRING: arg->str = val.str; break;
+ case MRP_DOMCTL_BOOL: arg->bln = val.bln; break;
+ case MRP_DOMCTL_UINT8: arg->u8 = val.u8; break;
+ case MRP_DOMCTL_INT8: arg->s8 = val.s8; break;
+ case MRP_DOMCTL_UINT16: arg->u16 = val.u16; break;
+ case MRP_DOMCTL_INT16: arg->s16 = val.s16; break;
+ case MRP_DOMCTL_UINT32: arg->u32 = val.u32; break;
+ case MRP_DOMCTL_INT32: arg->s32 = val.s32; break;
+ case MRP_DOMCTL_UINT64: arg->u64 = val.u64; break;
+ case MRP_DOMCTL_INT64: arg->s64 = val.s64; break;
+ case MRP_DOMCTL_DOUBLE: arg->dbl = val.dbl; break;
+ default:
+ if (MRP_DOMCTL_IS_ARRAY(type)) {
+ arg->arr = val.aany;
+ arg->size = size;
+ }
+ else
+ goto fail;
+ }
+ }
+
+ ret->wire = mrp_msg_ref(msg);
+ ret->unref_wire = msg_unref_wire;
+
+ return (msg_t *)ret;
+
+ fail:
+ msg_free_return((msg_t *)ret);
+ return NULL;
+}
+
+
msg_t *msg_decode_message(mrp_msg_t *msg)
{
uint16_t type;
case MSG_TYPE_NOTIFY: return msg_decode_notify(msg);
case MSG_TYPE_ACK: return msg_decode_ack(msg);
case MSG_TYPE_NAK: return msg_decode_nak(msg);
+ case MSG_TYPE_INVOKE: return msg_decode_invoke(msg);
+ case MSG_TYPE_RETURN: return msg_decode_return(msg);
default: break;
}
}
case MSG_TYPE_NOTIFY: return msg_encode_notify(&msg->notify);
case MSG_TYPE_ACK: return msg_encode_ack(&msg->ack);
case MSG_TYPE_NAK: return msg_encode_nak(&msg->nak);
+ case MSG_TYPE_INVOKE: return msg_encode_invoke(&msg->invoke);
+ case MSG_TYPE_RETURN: return msg_encode_return(&msg->ret);
default: return NULL;
}
}
case MSG_TYPE_NOTIFY: msg_free_notify(msg); break;
case MSG_TYPE_ACK: msg_free_ack(msg); break;
case MSG_TYPE_NAK: msg_free_nak(msg); break;
+ case MSG_TYPE_INVOKE: msg_free_invoke(msg); break;
+ case MSG_TYPE_RETURN: msg_free_return(msg); break;
default: break;
}
}
MSG_TYPE_NOTIFY,
MSG_TYPE_ACK,
MSG_TYPE_NAK,
+ MSG_TYPE_INVOKE,
+ MSG_TYPE_RETURN,
} msg_type_t;
typedef enum {
MSGTAG_NROW = 0x6, /* number of table rows */
MSGTAG_NCOL = 0x7, /* number of columns in a row */
MSGTAG_DATA = 0x8, /* a data column */
+
+ /* fixed tags in invoke and return messages */
+ MSGTAG_METHOD = 0x3, /* method name */
+ MSGTAG_NORET = 0x4, /* whether return values ignored */
+ MSGTAG_NARG = 0x5, /* number of arguments */
+ MSGTAG_ARG = 0x6, /* argument */
+ MSGTAG_ERROR = 0x7, /* invocation error */
+ MSGTAG_RETVAL = 0x8, /* invocation return value */
} msgtag_t;
+#define MSG_UINT8(tag, val) MRP_MSG_TAG_UINT8(MSGTAG_##tag, val)
+#define MSG_SINT8(tag, val) MRP_MSG_TAG_SINT8(MSGTAG_##tag, val)
#define MSG_UINT16(tag, val) MRP_MSG_TAG_UINT16(MSGTAG_##tag, val)
#define MSG_SINT16(tag, val) MRP_MSG_TAG_SINT16(MSGTAG_##tag, val)
#define MSG_UINT32(tag, val) MRP_MSG_TAG_UINT32(MSGTAG_##tag, val)
#define MSG_SINT32(tag, val) MRP_MSG_TAG_SINT32(MSGTAG_##tag, val)
+#define MSG_UINT64(tag, val) MRP_MSG_TAG_UINT64(MSGTAG_##tag, val)
+#define MSG_SINT64(tag, val) MRP_MSG_TAG_SINT64(MSGTAG_##tag, val)
#define MSG_DOUBLE(tag, val) MRP_MSG_TAG_DOUBLE(MSGTAG_##tag, val)
#define MSG_STRING(tag, val) MRP_MSG_TAG_STRING(MSGTAG_##tag, val)
+#define MSG_BOOL(tag, val) MRP_MSG_TAG_BOOL(MSGTAG_##tag, val)
#define MSG_ANY(tag, typep, valp) MRP_MSG_TAG_ANY(MSGTAG_##tag, typep, valp)
+#define MSG_ARRAY(tag, type, size, arr) \
+ MRP_MSG_TAGGED(MSGTAG_##tag, type, size, arr)
+
#define MSG_END MRP_MSG_END
#define COMMON_MSG_FIELDS /* common message fields */ \
const char *msg;
} nak_msg_t;
+
+typedef struct {
+ COMMON_MSG_FIELDS;
+ const char *name;
+ int noret;
+ uint32_t narg;
+ mrp_domctl_arg_t *args;
+} invoke_msg_t;
+
+
+typedef struct {
+ COMMON_MSG_FIELDS;
+ uint32_t error;
+ int32_t retval;
+ uint32_t narg;
+ mrp_domctl_arg_t *args;
+} return_msg_t;
+
+
typedef struct {
COMMON_MSG_FIELDS;
} any_msg_t;
notify_msg_t notify;
ack_msg_t ack;
nak_msg_t nak;
+ invoke_msg_t invoke;
+ return_msg_t ret;
};
#include "proxy.h"
+/*
+ * a pending proxied invocation
+ */
+
+typedef struct {
+ mrp_list_hook_t hook; /* to pending list */
+ uint32_t id; /* request id */
+ mrp_domain_return_cb_t cb; /* return callback */
+ void *user_data; /* opaque callback data */
+} pending_t;
+
+static void purge_pending(pep_proxy_t *proxy);
+
+
int init_proxies(pdp_t *pdp)
{
mrp_list_init(&pdp->proxies);
if (proxy != NULL) {
mrp_list_init(&proxy->hook);
mrp_list_init(&proxy->watches);
+ mrp_list_init(&proxy->pending);
- proxy->pdp = pdp;
+ proxy->pdp = pdp;
+ proxy->seqno = 1;
mrp_list_append(&pdp->proxies, &proxy->hook);
}
destroy_proxy_watches(proxy);
+ purge_pending(proxy);
+
mrp_free(proxy);
}
}
return TRUE;
}
+
+
+pep_proxy_t *find_proxy(pdp_t *pdp, const char *name)
+{
+ mrp_list_hook_t *p, *n;
+ pep_proxy_t *proxy;
+
+ mrp_list_foreach(&pdp->proxies, p, n) {
+ proxy = mrp_list_entry(p, typeof(*proxy), hook);
+
+ if (!strcmp(proxy->name, name))
+ return proxy;
+ }
+
+ return NULL;
+}
+
+
+uint32_t proxy_queue_pending(pep_proxy_t *proxy,
+ mrp_domain_return_cb_t return_cb, void *user_data)
+{
+ pending_t *pending;
+
+ if (return_cb == NULL)
+ return proxy->seqno++;
+
+ pending = mrp_allocz(sizeof(*pending));
+
+ if (pending == NULL)
+ return 0;
+
+ mrp_list_init(&pending->hook);
+
+ pending->id = proxy->seqno++;
+ pending->cb = return_cb;
+ pending->user_data = user_data;
+
+ mrp_list_append(&proxy->pending, &pending->hook);
+
+ return pending->id;
+}
+
+
+int proxy_dequeue_pending(pep_proxy_t *proxy, uint32_t id,
+ mrp_domain_return_cb_t *cbp, void **user_datap)
+{
+ mrp_list_hook_t *p, *n;
+ pending_t *pending;
+
+ mrp_list_foreach(&proxy->pending, p, n) {
+ pending = mrp_list_entry(p, typeof(*pending), hook);
+
+ if (pending->id == id) {
+ mrp_list_delete(&pending->hook);
+ *cbp = pending->cb;
+ *user_datap = pending->user_data;
+
+ mrp_free(pending);
+
+ return TRUE;
+ }
+ }
+
+ return FALSE;
+}
+
+
+static void purge_pending(pep_proxy_t *proxy)
+{
+ mrp_list_hook_t *p, *n;
+ pending_t *pending;
+
+ mrp_list_foreach(&proxy->pending, p, n) {
+ pending = mrp_list_entry(p, typeof(*pending), hook);
+
+ mrp_list_delete(&pending->hook);
+ mrp_free(pending);
+ }
+}
#ifndef __MURPHY_DOMAIN_CONTROL_PROXY_H__
#define __MURPHY_DOMAIN_CONTROL_PROXY_H__
+#include <murphy/core/domain.h>
+
#include "domain-control-types.h"
int init_proxies(pdp_t *pdp);
int *error, const char **errmsg);
int unregister_proxy(pep_proxy_t *proxy);
+pep_proxy_t *find_proxy(pdp_t *pdp, const char *name);
+
+uint32_t proxy_queue_pending(pep_proxy_t *proxy,
+ mrp_domain_return_cb_t return_cb, void *user_data);
+int proxy_dequeue_pending(pep_proxy_t *proxy, uint32_t id,
+ mrp_domain_return_cb_t *cb, void **user_datap);
+
#endif /* __MURPHY_DOMAIN_CONTROL_PROXY_H__ */