core, domain-control: added proxied domain function calls.
authorKrisztian Litkey <krisztian.litkey@intel.com>
Fri, 2 May 2014 10:48:53 +0000 (13:48 +0300)
committerKrisztian Litkey <krisztian.litkey@intel.com>
Wed, 20 Aug 2014 08:31:39 +0000 (11:31 +0300)
Added a mechanism for exporting and invoking 'domain' functions.
Domain controllers can register functions which then can be invoked
by the core. The core uses the domain-control plugin to proxy the
call to and the return from the invocation. These functions currently
cannot be queried by the core. Trying to invoke a non-existing function
will fail asynchronously. Trying to invoke a function of a non-existing
domain controller will fail synchronously.

Similarly, the core can export functions which then can be invoked
by domain controllers. The domain-control plugin proxies the call to
and the return from the invocation. These functions currently cannot
be queried by the domain-controllers. Trying to invoke a non-existing
function will fail asynchronously.

We shouldn't need to have a domain-control specific export mechanism
at all. We need to replace all the various import/export/invocation
mechanism (there is one at least in the plugin infra, in the resolver
and now this) with a single one at its core and only adapt it as
necessary for these various components. Eventually...

14 files changed:
src/Makefile.am
src/core/context.c
src/core/context.h
src/core/domain-types.h [new file with mode: 0644]
src/core/domain.c [new file with mode: 0644]
src/core/domain.h [new file with mode: 0644]
src/plugins/domain-control/client.c
src/plugins/domain-control/client.h
src/plugins/domain-control/domain-control-types.h
src/plugins/domain-control/domain-control.c
src/plugins/domain-control/message.c
src/plugins/domain-control/message.h
src/plugins/domain-control/proxy.c
src/plugins/domain-control/proxy.h

index 1fc0134..7f96669 100644 (file)
@@ -285,7 +285,9 @@ libmurphy_core_la_HEADERS =         \
                core/scripting.h        \
                core/method.h           \
                core/event.h            \
-               core/auth.h
+               core/auth.h             \
+               core/domain.h           \
+               core/domain-types.h
 
 libmurphy_core_la_REGULAR_SOURCES =    \
                core/context.c          \
@@ -296,6 +298,7 @@ libmurphy_core_la_REGULAR_SOURCES = \
                core/event.c            \
                core/auth.c             \
                core/auth-deny.c        \
+               core/domain.c           \
                $(LUA_BINDINGS_SOURCES)
 
 if SMACK_ENABLED
index e3f7906..fd01cce 100644 (file)
@@ -32,6 +32,7 @@
 #include <murphy/common/mm.h>
 #include <murphy/core/context.h>
 #include <murphy/core/console-priv.h>
+#include <murphy/core/domain.h>
 
 mrp_context_t *mrp_context_create(void)
 {
@@ -40,9 +41,11 @@ mrp_context_t *mrp_context_create(void)
     if ((c = mrp_allocz(sizeof(*c))) != NULL) {
         mrp_list_init(&c->plugins);
         console_setup(c);
+        domain_setup(c);
 
         mrp_list_init(&c->auth);
 
+
         if ((c->ml = mrp_mainloop_create()) == NULL) {
             mrp_log_error("Failed to create mainloop.");
             mrp_free(c);
index 357d9f5..e5367d6 100644 (file)
@@ -78,6 +78,26 @@ struct mrp_context_s {
     mrp_resolver_t  *r;                    /* resolver context */
     void            *lua_state;            /* state for Lua bindings */
     mrp_list_hook_t  auth;                 /* authenticator backends */
+
+    /*
+     * Hmm, this is not very nice.  Most of the domain handling code (in
+     * practice all) used to live in the domain-control plugin. To avoid
+     * loading order dependencies on plugin-domain-control we now started
+     * collecting registered handlers of proxied functions here. Calls by
+     * the core to proxied functions of domain controllers and by domain-
+     * controllers to the core are still handled in the domain-control
+     * plugin (and in the domain-controller client library).
+     *
+     * It would be perhaps the cleanest not to have a domain-controller
+     * specific function export mechanism at all. Instead the various
+     * import/export mechanisms (at least plugins, resolver, and this) should
+     * be replaced by / built on a single core implementation that is flexible
+     * enough to handle all the needs of all these.
+     */
+
+    mrp_list_hook_t  domain_methods;       /* functions for domain controllers */
+    void            *domain_invoke;        /* domain invoke handler */
+    void            *domain_data;          /* domain invoke handler data */
 };
 
 /** Create a new murphy context. */
diff --git a/src/core/domain-types.h b/src/core/domain-types.h
new file mode 100644 (file)
index 0000000..5064078
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2012-2014, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *  * Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ *  * Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *  * Neither the name of Intel Corporation nor the names of its contributors
+ *    may be used to endorse or promote products derived from this software
+ *    without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __MURPHY_CORE_DOMAIN_TYPES_H__
+#define __MURPHY_CORE_DOMAIN_TYPES_H__
+
+#include <murphy/common/macros.h>
+#include <murphy/common/msg.h>
+
+MRP_CDECL_BEGIN
+
+
+/*
+ * passable data types to/from domain controllers
+ */
+typedef enum {
+    MRP_DOMCTL_END      = MRP_MSG_FIELD_INVALID,
+    MRP_DOMCTL_STRING   = MRP_MSG_FIELD_STRING,
+    MRP_DOMCTL_INTEGER  = MRP_MSG_FIELD_INT32,
+    MRP_DOMCTL_UNSIGNED = MRP_MSG_FIELD_UINT32,
+    MRP_DOMCTL_DOUBLE   = MRP_MSG_FIELD_DOUBLE,
+    MRP_DOMCTL_BOOL     = MRP_MSG_FIELD_BOOL,
+    MRP_DOMCTL_UINT8    = MRP_MSG_FIELD_UINT8,
+    MRP_DOMCTL_INT8     = MRP_MSG_FIELD_INT8,
+    MRP_DOMCTL_UINT16   = MRP_MSG_FIELD_UINT16,
+    MRP_DOMCTL_INT16    = MRP_MSG_FIELD_INT16,
+    MRP_DOMCTL_UINT32   = MRP_MSG_FIELD_UINT32,
+    MRP_DOMCTL_INT32    = MRP_MSG_FIELD_INT32,
+    MRP_DOMCTL_UINT64   = MRP_MSG_FIELD_UINT64,
+    MRP_DOMCTL_INT64    = MRP_MSG_FIELD_INT64,
+
+#define MRP_DOMCTL_ARRAY(_type)      MRP_MSG_FIELD_ARRAY_OF(_type)
+#define MRP_DOMCTL_IS_ARRAY(_type)   MRP_MSG_FIELD_IS_ARRAY(_type)
+#define MRP_DOMCTL_ARRAY_TYPE(_type) MRP_MSG_FIELD_ARRAY_TYPE(_type)
+} mrp_domctl_type_t;
+
+
+/*
+ * a single data value passed to/from a domain controller
+ */
+
+typedef struct {
+    mrp_domctl_type_t  type;             /* data type */
+    union {
+        /* these are usable both in DB operations and proxied invocations */
+        const char *str;                 /* MRP_DOMCTL_STRING */
+        uint32_t    u32;                 /* MRP_DOMCTL_{UNSIGNED,UINT32} */
+        int32_t     s32;                 /* MRP_DOMCTL_{INTEGER,INT32} */
+        double      dbl;                 /* MRP_DOMCTL_DOUBLE */
+        /* these are only usable in proxied invocations */
+        int         bln;                 /* MRP_DOMCTL_BOOL */
+        uint8_t     u8;                  /* MRP_DOMCTL_UINT8 */
+        int8_t      s8;                  /* MRP_DOMCTL_INT8 */
+        uint16_t    u16;                 /* MRP_DOMCTL_UINT16 */
+        int16_t     s16;                 /* MRP_DOMCTL_INT16 */
+        uint64_t    u64;                 /* MRP_DOMCTL_UINT64 */
+        int64_t     s64;                 /* MRP_DOMCTL_INT64 */
+        void       *arr;                 /* MRP_DOMCTL_ARRAY(*) */
+    };
+    uint32_t           size;             /* size for arrays */
+} mrp_domctl_value_t;
+
+
+/*
+ * proxied invokation errors
+ */
+
+typedef enum {
+    MRP_DOMAIN_OK = 0,                   /* no errors */
+    MRP_DOMAIN_NOTFOUND,                 /* domain not found */
+    MRP_DOMAIN_NOMETHOD,                 /* call domain method not found */
+    MRP_DOMAIN_FAILED,                   /* called method remotely failed */
+} mrp_domain_error_t;
+
+/* Type for a proxied invocation argument. */
+typedef mrp_domctl_value_t mrp_domctl_arg_t;
+
+MRP_CDECL_END
+
+#endif /* __MURPHY_CORE_DOMAIN_TYPES_H__ */
diff --git a/src/core/domain.c b/src/core/domain.c
new file mode 100644 (file)
index 0000000..75df352
--- /dev/null
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2012-2014, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *  * Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ *  * Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *  * Neither the name of Intel Corporation nor the names of its contributors
+ *    may be used to endorse or promote products derived from this software
+ *    without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <murphy/common/macros.h>
+#include <murphy/common/debug.h>
+#include <murphy/common/log.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/list.h>
+
+#include <murphy/core/context.h>
+#include <murphy/core/domain.h>
+
+typedef struct {
+    mrp_list_hook_t         hook;        /* to list of registered methods */
+    char                   *name;        /* method name */
+    int                     max_out;     /* max returned arguments */
+    mrp_domain_invoke_cb_t  cb;          /* actual callback */
+    void                   *user_data;   /* callback user data */
+} method_t;
+
+
+void domain_setup(mrp_context_t *ctx)
+{
+    mrp_list_init(&ctx->domain_methods);
+}
+
+
+int mrp_set_domain_invoke_handler(mrp_context_t *ctx,
+                                  mrp_domain_invoke_handler_t handler,
+                                  void *handler_data)
+{
+    if (ctx->domain_invoke != NULL)
+        return FALSE;
+
+    ctx->domain_invoke = handler;
+    ctx->domain_data   = handler_data;
+
+    return TRUE;
+}
+
+
+int mrp_register_domain_methods(mrp_context_t *ctx,
+                                mrp_domain_method_def_t *defs, size_t ndef)
+{
+    mrp_domain_method_def_t *def;
+    method_t                *m;
+    size_t                   i;
+
+    for (i = 0, def = defs; i < ndef; i++, def++) {
+        m = mrp_allocz(sizeof(*m));
+
+        if (m == NULL)
+            return FALSE;
+
+        mrp_list_init(&m->hook);
+
+        m->name      = mrp_strdup(def->name);
+        m->max_out   = def->max_out;
+        m->cb        = def->cb;
+        m->user_data = def->user_data;
+
+        if (m->name == NULL) {
+            mrp_free(m);
+            return FALSE;
+        }
+
+        mrp_list_append(&ctx->domain_methods, &m->hook);
+    }
+
+    return TRUE;
+}
+
+
+static method_t *find_method(mrp_context_t *ctx, const char *name)
+{
+    mrp_list_hook_t *p, *n;
+    method_t        *m;
+
+    mrp_list_foreach(&ctx->domain_methods, p, n) {
+        m = mrp_list_entry(p, typeof(*m), hook);
+
+        if (!strcmp(m->name, name))
+            return m;
+    }
+
+    return NULL;
+}
+
+
+int mrp_lookup_domain_method(mrp_context_t *ctx, const char *name,
+                             mrp_domain_invoke_cb_t *cb, int *max_out,
+                             void **user_data)
+{
+    method_t *m;
+
+    m = find_method(ctx, name);
+
+    if (m == NULL)
+        return FALSE;
+
+    *cb        = m->cb;
+    *max_out   = m->max_out;
+    *user_data = m->user_data;
+
+    return TRUE;
+}
+
+
+int mrp_invoke_domain(mrp_context_t *ctx, const char *domain,
+                      const char *method, int narg, mrp_domctl_arg_t *args,
+                      mrp_domain_return_cb_t return_cb, void *user_data)
+{
+    mrp_domain_invoke_handler_t  handler      = ctx->domain_invoke;
+    void                        *handler_data = ctx->domain_data;
+
+    if (handler == NULL)
+        return FALSE;
+
+    return handler(handler_data, domain,
+                   method, narg, args, return_cb, user_data);
+}
diff --git a/src/core/domain.h b/src/core/domain.h
new file mode 100644 (file)
index 0000000..59ac523
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2012-2014, Intel Corporation
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *  * Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ *  * Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *  * Neither the name of Intel Corporation nor the names of its contributors
+ *    may be used to endorse or promote products derived from this software
+ *    without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __MURPHY_CORE_DOMAIN__
+#define __MURPHY_CORE_DOMAIN__
+
+#include <murphy/common/macros.h>
+#include <murphy/common/msg.h>
+
+#include <murphy/core/context.h>
+#include <murphy/core/domain-types.h>
+
+MRP_CDECL_BEGIN
+
+/* Type for a proxied invocation handler. */
+typedef int (*mrp_domain_invoke_cb_t)(int narg, mrp_domctl_arg_t *args,
+                                      uint32_t *nout, mrp_domctl_arg_t *outs,
+                                      void *user_data);
+
+/* Type for a proxied invocation return/reply handler. */
+typedef void (*mrp_domain_return_cb_t)(int error, int retval, int narg,
+                                       mrp_domctl_arg_t *args, void *user_data);
+
+typedef struct {
+    char                   *name;        /* method name */
+    int                     max_out;     /* max. number of return arguments */
+    mrp_domain_invoke_cb_t  cb;          /* handler callback */
+    void                   *user_data;   /* opaque callback user data */
+} mrp_domain_method_def_t;
+
+
+
+/* Type for handling proxied invocation to domain controllers. */
+typedef int (*mrp_domain_invoke_handler_t)(void *handler_data, const char *id,
+                                           const char *method, int narg,
+                                           mrp_domctl_arg_t *args,
+                                           mrp_domain_return_cb_t return_cb,
+                                           void *user_data);
+
+/* Initialize domain-specific context parts. */
+void domain_setup(mrp_context_t *ctx);
+
+/* Register a domain method. */
+int mrp_register_domain_methods(mrp_context_t *ctx,
+                                mrp_domain_method_def_t *defs, size_t ndef);
+
+/* Find a registered domain method. */
+int mrp_lookup_domain_method(mrp_context_t *ctx, const char *method,
+                             mrp_domain_invoke_cb_t *cb, int *max_out,
+                             void **user_data);
+
+/* Invoke the named method of the specified domain. */
+int mrp_invoke_domain(mrp_context_t *ctx, const char *domain, const char *method,
+                      int narg, mrp_domctl_arg_t *args,
+                      mrp_domain_return_cb_t return_cb, void *user_data);
+
+/* Set the domain invoke handler. */
+int mrp_set_domain_invoke_handler(mrp_context_t *ctx,
+                                  mrp_domain_invoke_handler_t handler,
+                                  void *handler_data);
+
+MRP_CDECL_END
+
+#endif /* __MURPHY_CORE_DOMAIN__ */
index c215bbb..31e3cb1 100644 (file)
 typedef struct {
     mrp_list_hook_t         hook;        /* hook to pending request queue */
     uint32_t                seqno;       /* sequence number/request id */
-    mrp_domctl_status_cb_t  cb;          /* callback to call upon completion */
+    int                     invoke : 1;  /* whether a pending invocation */
+    union {
+        mrp_domctl_status_cb_t  status;  /* request completion callback */
+        mrp_domctl_return_cb_t  ret;     /* invocation return cb */
+    } cb;
     void                   *user_data;   /* opaque callback data */
 } pending_request_t;
 
 
+/*
+ * a registered proxied method
+ */
+
+typedef struct {
+    mrp_list_hook_t         hook;        /* to list of methods */
+    char                   *name;        /* method name */
+    size_t                  max_out;     /* max return arguments */
+    mrp_domctl_invoke_cb_t  cb;          /* handler callback */
+    void                   *user_data;   /* opaque handler data */
+} method_t;
+
 static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
 static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
                         mrp_sockaddr_t *addr, socklen_t addrlen,
@@ -74,8 +90,11 @@ static void closed_cb(mrp_transport_t *t, int error, void *user_data);
 
 static int queue_pending(mrp_domctl_t *dc, uint32_t seq,
                          mrp_domctl_status_cb_t cb, void *user_data);
-static int notify_pending(mrp_domctl_t *dc, uint32_t seq, int error,
-                          const char *msg);
+static int notify_pending(mrp_domctl_t *dc, msg_t *msg);
+static int queue_invoke(mrp_domctl_t *dc, uint32_t seq,
+                        mrp_domctl_return_cb_t cb, void *user_data);
+static int notify_invoke(mrp_domctl_t *dc, uint32_t seq, int error,
+                         int status, int narg, mrp_domctl_arg_t *args);
 static void purge_pending(mrp_domctl_t *dc);
 
 
@@ -139,6 +158,8 @@ mrp_domctl_t *mrp_domctl_create(const char *name, mrp_mainloop_t *ml,
             dc->user_data  = user_data;
             dc->seqno      = 1;
 
+            mrp_list_init(&dc->methods);
+
             return dc;
         }
 
@@ -376,10 +397,97 @@ int mrp_domctl_set_data(mrp_domctl_t *dc, mrp_domctl_data_t *tables, int ntable,
 }
 
 
+int mrp_domctl_invoke(mrp_domctl_t *dc, const char *name, int narg,
+                      mrp_domctl_arg_t *args, mrp_domctl_return_cb_t reply_cb,
+                      void *user_data)
+{
+    invoke_msg_t  invoke;
+    mrp_msg_t    *msg;
+    uint32_t      seq = dc->seqno++;
+    int           success;
+
+    if (!dc->connected)
+        return FALSE;
+
+    if (reply_cb == NULL && user_data != NULL)
+        return FALSE;
+
+    mrp_clear(&invoke);
+    invoke.type  = MSG_TYPE_INVOKE;
+    invoke.seq   = seq;
+    invoke.name  = name;
+    invoke.noret = reply_cb ? TRUE : FALSE;
+    invoke.narg  = narg;
+    invoke.args  = args;
+
+    msg = msg_encode_message((msg_t *)&invoke);
+
+    if (msg != NULL) {
+        success = mrp_transport_send(dc->t, msg);
+        mrp_msg_unref(msg);
+
+        if (success)
+            queue_invoke(dc, seq, reply_cb, user_data);
+
+        return success;
+    }
+    else
+        return FALSE;
+}
+
+
+int mrp_domctl_register_methods(mrp_domctl_t *dc, mrp_domctl_method_def_t *defs,
+                                size_t ndef)
+{
+    mrp_domctl_method_def_t *def;
+    method_t                *m;
+    size_t                   i;
+
+    for (i = 0, def = defs; i < ndef; i++, def++) {
+        m = mrp_allocz(sizeof(*m));
+
+        if (m == NULL)
+            return FALSE;
+
+        mrp_list_init(&m->hook);
+
+        m->name      = mrp_strdup(def->name);
+        m->max_out   = def->max_out;
+        m->cb        = def->cb;
+        m->user_data = def->user_data;
+
+        if (m->name == NULL) {
+            mrp_free(m);
+            return FALSE;
+        }
+
+        mrp_list_append(&dc->methods, &m->hook);
+    }
+
+    return TRUE;
+}
+
+
+static method_t *find_method(mrp_domctl_t *dc, const char *name)
+{
+    mrp_list_hook_t *p, *n;
+    method_t        *m;
+
+    mrp_list_foreach(&dc->methods, p, n) {
+        m = mrp_list_entry(p, typeof(*m), hook);
+
+        if (!strcmp(m->name, name))
+            return m;
+    }
+
+    return NULL;
+}
+
+
 static void process_ack(mrp_domctl_t *dc, ack_msg_t *ack)
 {
     if (ack->seq != 0)
-        notify_pending(dc, ack->seq, 0, NULL);
+        notify_pending(dc, (msg_t *)ack);
     else
         notify_connect(dc);
 }
@@ -388,7 +496,7 @@ static void process_ack(mrp_domctl_t *dc, ack_msg_t *ack)
 static void process_nak(mrp_domctl_t *dc, nak_msg_t *nak)
 {
     if (nak->seq != 0)
-        notify_pending(dc, nak->seq, nak->error, nak->msg);
+        notify_pending(dc, (msg_t *)nak);
     else
         notify_disconnect(dc, nak->error, nak->msg);
 }
@@ -400,6 +508,82 @@ static void process_notify(mrp_domctl_t *dc, notify_msg_t *notify)
 }
 
 
+static void process_invoke(mrp_domctl_t *dc, invoke_msg_t *invoke)
+{
+    method_t         *m;
+    mrp_domctl_arg_t *args, error;
+    int               narg;
+    return_msg_t      ret;
+    mrp_msg_t        *msg;
+    int               i;
+
+    mrp_clear(&ret);
+
+    m = find_method(dc, invoke->name);
+
+    ret.type = MSG_TYPE_RETURN;
+    ret.seq  = invoke->seq;
+
+    if (m == NULL) {
+        ret.error = MRP_DOMCTL_NOTFOUND;
+    }
+    else {
+        ret.error = MRP_DOMCTL_OK;
+
+        narg = ret.narg = m->max_out;
+
+        if (narg > 0) {
+            args = ret.args = alloca(narg * sizeof(args[0]));
+            memset(args, 0, narg * sizeof(args[0]));
+        }
+
+        ret.retval = m->cb(dc, invoke->narg, invoke->args,
+                           &ret.narg, ret.args, m->user_data);
+    }
+
+    msg = msg_encode_message((msg_t *)&ret);
+
+    if (msg == NULL) {
+        error.type = MRP_DOMCTL_STRING;
+        error.str  = "failed to encode return message (arguments)";
+        ret.error  = MRP_DOMAIN_FAILED;
+        ret.narg   = 1;
+        ret.args   = &error;
+
+        msg = msg_encode_message((msg_t *)&ret);
+
+        ret.narg = 0;
+        ret.args = NULL;
+    }
+
+    if (msg != NULL) {
+        mrp_transport_send(dc->t, msg);
+        mrp_msg_unref(msg);
+    }
+
+    narg = ret.narg;
+    for (i = 0; i < narg; i++) {
+        if (args[i].type == MRP_DOMCTL_STRING)
+            mrp_free((char *)args[i].str);
+        else if (MRP_DOMCTL_IS_ARRAY(args[i].type)) {
+            uint32_t j;
+
+            for (j = 0; j < args[i].size; j++)
+                if (MRP_DOMCTL_ARRAY_TYPE(args[i].type) == MRP_DOMCTL_STRING)
+                    mrp_free(((char **)args[i].arr)[j]);
+
+            mrp_free(args[i].arr);
+        }
+    }
+}
+
+
+static void process_return(mrp_domctl_t *dc, return_msg_t *ret)
+{
+    notify_pending(dc, (msg_t *)ret);
+}
+
+
 static void recv_cb(mrp_transport_t *t, mrp_msg_t *tmsg, void *user_data)
 {
     mrp_domctl_t  *dc = (mrp_domctl_t *)user_data;
@@ -425,6 +609,12 @@ static void recv_cb(mrp_transport_t *t, mrp_msg_t *tmsg, void *user_data)
         case MSG_TYPE_NAK:
             process_nak(dc, &msg->nak);
             break;
+        case MSG_TYPE_INVOKE:
+            process_invoke(dc, &msg->invoke);
+            break;
+        case MSG_TYPE_RETURN:
+            process_return(dc, &msg->ret);
+            break;
         default:
             mrp_domctl_disconnect(dc);
             notify_disconnect(dc, EINVAL, "unexpected message from server");
@@ -493,8 +683,9 @@ static int queue_pending(mrp_domctl_t *dc, uint32_t seq,
     if (pending != NULL) {
         mrp_list_init(&pending->hook);
 
+        pending->invoke    = false;
         pending->seqno     = seq;
-        pending->cb        = cb;
+        pending->cb.status = cb;
         pending->user_data = user_data;
 
         mrp_list_append(&dc->pending, &pending->hook);
@@ -506,30 +697,100 @@ static int queue_pending(mrp_domctl_t *dc, uint32_t seq,
 }
 
 
-static int notify_pending(mrp_domctl_t *dc, uint32_t seq, int error,
-                          const char *msg)
+static int notify_pending(mrp_domctl_t *dc, msg_t *msg)
 {
     mrp_list_hook_t   *p, *n;
     pending_request_t *pending;
+    uint32_t           seq;
+    int                error, status;
+    const char        *message;
+    int                narg;
+    mrp_domctl_arg_t  *args;
+    int                success;
+
+    seq = msg->any.seq;
 
     mrp_list_foreach(&dc->pending, p, n) {
         pending = mrp_list_entry(p, typeof(*pending), hook);
 
-        if (pending->seqno == seq) {
-            DOMCTL_MARK_BUSY(dc, {
-                    pending->cb(dc, error, msg, pending->user_data);
-                    mrp_list_delete(&pending->hook);
-                    mrp_free(pending);
-                });
-
-            return TRUE;
+        if (pending->seqno != seq)
+            continue;
+
+        if (!pending->invoke) {
+            switch (msg->any.type) {
+            case MSG_TYPE_ACK:
+                error   = 0;
+                message = NULL;
+                goto notify;
+            case MSG_TYPE_NAK:
+                error   = msg->nak.error;
+                message = msg->nak.msg;
+            notify:
+                DOMCTL_MARK_BUSY(dc, {
+                        pending->cb.status(dc, error, message,
+                                           pending->user_data);
+                    });
+                success = TRUE;
+                break;
+            default:
+                success = FALSE;
+                break;
+            }
+        }
+        else {
+            if (msg->any.type == MSG_TYPE_RETURN) {
+                error  = msg->ret.error;
+                status = msg->ret.retval;
+                narg   = msg->ret.narg;
+                args   = msg->ret.args;
+
+                DOMCTL_MARK_BUSY(dc, {
+                        pending->cb.ret(dc, error, status, narg, args,
+                                        pending->user_data);
+                    });
+                success = TRUE;
+            }
+            else
+                success = FALSE;
         }
+
+        mrp_list_delete(&pending->hook);
+        mrp_free(pending);
+
+        return success;
     }
 
     return FALSE;
 }
 
 
+static int queue_invoke(mrp_domctl_t *dc, uint32_t seq,
+                        mrp_domctl_return_cb_t cb, void *user_data)
+{
+    pending_request_t *pending;
+
+    if (cb == NULL)
+        return TRUE;
+
+    pending = mrp_allocz(sizeof(*pending));
+
+    if (pending != NULL) {
+        mrp_list_init(&pending->hook);
+
+        pending->invoke    = true;
+        pending->seqno     = seq;
+        pending->cb.ret    = cb;
+        pending->user_data = user_data;
+
+        mrp_list_append(&dc->pending, &pending->hook);
+
+        return TRUE;
+    }
+    else
+        return FALSE;
+}
+
+
 static void purge_pending(mrp_domctl_t *dc)
 {
     mrp_list_hook_t   *p, *n;
index 31759d5..913f09a 100644 (file)
@@ -33,6 +33,7 @@
 #include <murphy/common/macros.h>
 #include <murphy/common/mainloop.h>
 #include <murphy-db/mqi.h>
+#include <murphy/core/domain-types.h>
 
 MRP_CDECL_BEGIN
 
@@ -73,28 +74,6 @@ typedef struct {
 
 
 /*
- * table column types and values
- */
-
-typedef enum {
-    MRP_DOMCTL_STRING   = mqi_varchar,
-    MRP_DOMCTL_INTEGER  = mqi_integer,
-    MRP_DOMCTL_UNSIGNED = mqi_unsignd,
-    MRP_DOMCTL_DOUBLE   = mqi_floating
-} mrp_domctl_type_t;
-
-typedef struct {
-    mrp_domctl_type_t  type;             /* data type */
-    union {
-        const char *str;                 /* MRP_DOMCTL_STRING */
-        uint32_t    u32;                 /* MRP_DOMCTL_UNSIGNED */
-        int32_t     s32;                 /* MRP_DOMCTL_INTEGER */
-        double      dbl;                 /* MRP_DOMCTL_DOUBLE */
-    };
-} mrp_domctl_value_t;
-
-
-/*
  * table data
  */
 
@@ -107,7 +86,6 @@ typedef struct {
 } mrp_domctl_data_t;
 
 
-
 /** Opaque policy domain controller type. */
 typedef struct mrp_domctl_s mrp_domctl_t;
 
@@ -125,6 +103,39 @@ typedef void (*mrp_domctl_watch_cb_t)(mrp_domctl_t *dc,
                                       mrp_domctl_data_t *tables, int ntable,
                                       void *user_data);
 
+/** Callback type for return of/reply to a proxied method invocation. */
+typedef void (*mrp_domctl_return_cb_t)(mrp_domctl_t *dc, int error, int retval,
+                                       int narg, mrp_domctl_arg_t *args,
+                                       void *user_data);
+
+/** Callback type for a proxied method invocation. */
+typedef int (*mrp_domctl_invoke_cb_t)(mrp_domctl_t *dc, int narg,
+                                      mrp_domctl_arg_t *args,
+                                      int *nout, mrp_domctl_arg_t *outs,
+                                      void *user_data);
+
+/*
+ * proxied invocation errors
+ */
+
+typedef enum {
+    MRP_DOMCTL_OK       = MRP_DOMAIN_OK,
+    MRP_DOMCTL_NOTFOUND = MRP_DOMAIN_NOTFOUND,
+    MRP_DOMCTL_NOMETHOD = MRP_DOMAIN_NOMETHOD,
+} mrp_domctl_error_t;
+
+/*
+ * a domain controller method definition
+ */
+
+typedef struct {
+    const char             *name;
+    size_t                  max_out;
+    mrp_domctl_invoke_cb_t  cb;
+    void                   *user_data;
+} mrp_domctl_method_def_t;
+
+
 /** Create a new policy domain controller. */
 mrp_domctl_t *mrp_domctl_create(const char *name, mrp_mainloop_t *ml,
                                 mrp_domctl_table_t *tables, int ntable,
@@ -150,6 +161,15 @@ void mrp_domctl_disconnect(mrp_domctl_t *dc);
 int mrp_domctl_set_data(mrp_domctl_t *dc, mrp_domctl_data_t *tables, int ntable,
                         mrp_domctl_status_cb_t status_cb, void *user_data);
 
+/** Invoke a proxied method. */
+int mrp_domctl_invoke(mrp_domctl_t *dc, const char *method, int narg,
+                      mrp_domctl_arg_t *args, mrp_domctl_return_cb_t return_cb,
+                      void *user_data);
+
+/** Register a proxied method handler. */
+int mrp_domctl_register_methods(mrp_domctl_t *dc, mrp_domctl_method_def_t *defs,
+                                size_t ndef);
+
 MRP_CDECL_END
 
 #endif /* __MURPHY_DOMAIN_CONTROL_CLIENT_H__ */
index 838622e..360dce8 100644 (file)
@@ -70,6 +70,7 @@ struct mrp_domctl_s {
     int                      destroyed:1;/* non-zero if destroy pending */
     uint32_t                 seqno;      /* request sequence number */
     mrp_list_hook_t          pending;    /* queue of outstanding requests */
+    mrp_list_hook_t          methods;    /* registered proxied methods */
 };
 
 
@@ -133,6 +134,8 @@ struct pep_proxy_s {
     int                ntable;           /* number of tables */
     mrp_list_hook_t    watches;          /* tables watched by this */
     proxy_ops_t       *ops;              /* transport/messaging operations */
+    uint32_t           seqno;            /* request sequence number */
+    mrp_list_hook_t    pending;          /* pending method invocations */
     int                notify_update;    /* whether needs notification */
     void              *notify_msg;       /* notification being built */
     int                notify_ntable;    /* number of changed tables */
index e25eb9a..82ae528 100644 (file)
@@ -33,6 +33,8 @@
 #include <murphy/common/log.h>
 #include <murphy/common/wsck-transport.h>
 
+#include <murphy/core/domain.h>
+
 #include "proxy.h"
 #include "message.h"
 #include "table.h"
 static mrp_transport_t *create_transport(pdp_t *pdp, const char *address);
 static void destroy_transport(mrp_transport_t *t);
 
+static int invoke_handler(void *handler_data, const char *id,
+                          const char *method, int narg,
+                          mrp_domctl_arg_t *args,
+                          mrp_domain_return_cb_t return_cb,
+                          void *user_data);
+
+
 pdp_t *create_domain_control(mrp_context_t *ctx,
                              const char *extaddr, const char *intaddr,
                              const char *wrtaddr, const char *httpdir)
@@ -79,8 +88,10 @@ pdp_t *create_domain_control(mrp_context_t *ctx,
 
             if ((!extaddr || !*extaddr || pdp->extt != NULL) &&
                 (!intaddr || !*intaddr || pdp->intt != NULL) &&
-                (!wrtaddr || !*wrtaddr || pdp->wrtt != NULL))
+                (!wrtaddr || !*wrtaddr || pdp->wrtt != NULL)) {
+                mrp_set_domain_invoke_handler(ctx, invoke_handler, pdp);
                 return pdp;
+            }
         }
 
         destroy_domain_control(pdp);
@@ -207,6 +218,79 @@ static void process_set(pep_proxy_t *proxy, set_msg_t *set)
 }
 
 
+static void process_invoke(pep_proxy_t *proxy, invoke_msg_t *invoke)
+{
+    mrp_context_t          *ctx = proxy->pdp->ctx;
+    mrp_domain_invoke_cb_t  cb;
+    void                   *user_data;
+    int                     max_out;
+    mrp_domctl_arg_t       *args;
+    int                     narg;
+    return_msg_t            ret;
+    mrp_msg_t              *msg;
+    int                     i;
+
+    mrp_clear(&ret);
+
+    ret.type = MSG_TYPE_RETURN;
+    ret.seq  = invoke->seq;
+    args     = NULL;
+    narg     = 0;
+
+    if (!mrp_lookup_domain_method(ctx, invoke->name, &cb, &max_out,&user_data)) {
+        ret.error = MRP_DOMCTL_NOTFOUND;
+    }
+    else {
+        ret.error = MRP_DOMCTL_OK;
+
+        narg = ret.narg = max_out;
+
+        if (narg > 0) {
+            args = ret.args = alloca(narg * sizeof(args[0]));
+            memset(args, 0, narg * sizeof(args[0]));
+        }
+
+        ret.retval = cb(invoke->narg, invoke->args,
+                        &ret.narg, ret.args, user_data);
+    }
+
+    msg = msg_encode_message((msg_t *)&ret);
+
+    if (msg != NULL) {
+        mrp_transport_send(proxy->t, msg);
+        mrp_msg_unref(msg);
+    }
+
+    narg = ret.narg;
+    for (i = 0; i < narg; i++) {
+        if (args[i].type == MRP_DOMCTL_STRING)
+            mrp_free((char *)args[i].str);
+        else if (MRP_DOMCTL_IS_ARRAY(args[i].type)) {
+            uint32_t j;
+
+            for (j = 0; j < args[i].size; j++)
+                if (MRP_DOMCTL_ARRAY_TYPE(args[i].type) == MRP_DOMCTL_STRING)
+                    mrp_free(((char **)args[i].arr)[j]);
+
+            mrp_free(args[i].arr);
+        }
+    }
+}
+
+
+static void process_return(pep_proxy_t *proxy, return_msg_t *ret)
+{
+    uint32_t                id = ret->seq;
+    mrp_domain_return_cb_t  cb;
+    void                   *user_data;
+
+    if (!proxy_dequeue_pending(proxy, id, &cb, &user_data))
+        return;
+
+    cb(ret->error, ret->retval, ret->narg, ret->args, user_data);
+}
+
+
 static void process_message(pep_proxy_t *proxy, msg_t *msg)
 {
     char *name  = proxy->name ? proxy->name : "<unknown>";
@@ -221,6 +305,12 @@ static void process_message(pep_proxy_t *proxy, msg_t *msg)
     case MSG_TYPE_SET:
         process_set(proxy, &msg->set);
         break;
+    case MSG_TYPE_INVOKE:
+        process_invoke(proxy, &msg->invoke);
+        break;
+    case MSG_TYPE_RETURN:
+        process_return(proxy, &msg->ret);
+        break;
     default:
         mrp_log_error("Unexpected message 0x%x from client %s.",
                       msg->any.type, name);
@@ -229,6 +319,38 @@ static void process_message(pep_proxy_t *proxy, msg_t *msg)
 }
 
 
+static int invoke_handler(void *handler_data, const char *domain,
+                          const char *method, int narg,
+                          mrp_domctl_arg_t *args,
+                          mrp_domain_return_cb_t return_cb,
+                          void *user_data)
+{
+    pdp_t        *pdp   = (pdp_t *)handler_data;
+    pep_proxy_t  *proxy = find_proxy(pdp, domain);
+    uint32_t      id;
+    invoke_msg_t  invoke;
+
+    if (proxy == NULL)
+        return FALSE;
+
+    id = proxy_queue_pending(proxy, return_cb, user_data);
+
+    if (!id)
+        return FALSE;
+
+    mrp_clear(&invoke);
+
+    invoke.type  = MSG_TYPE_INVOKE;
+    invoke.seq   = id;
+    invoke.name  = method;
+    invoke.noret = (return_cb == NULL);
+    invoke.narg  = narg;
+    invoke.args  = args;
+
+    return msg_send_message(proxy, (msg_t *)&invoke);
+}
+
+
 static int msg_op_send_msg(pep_proxy_t *proxy, msg_t *msg)
 {
     return msg_send_message(proxy, msg);
index 48e93b1..33a299c 100644 (file)
@@ -742,6 +742,334 @@ msg_t *msg_decode_notify(mrp_msg_t *msg)
 }
 
 
+void msg_free_invoke(msg_t *msg)
+{
+    if (msg != NULL) {
+        mrp_free(msg->invoke.args);
+        mrp_free(msg);
+    }
+}
+
+
+mrp_msg_t *msg_encode_invoke(invoke_msg_t *invoke)
+{
+    mrp_msg_t        *msg;
+    mrp_domctl_arg_t *arg;
+    uint32_t          i;
+
+    msg = mrp_msg_create(MSG_UINT16(MSGTYPE, MSG_TYPE_INVOKE),
+                         MSG_UINT32(MSGSEQ , invoke->seq),
+                         MSG_STRING(METHOD , invoke->name),
+                         MSG_BOOL  (NORET  , invoke->noret),
+                         MSG_UINT32(NARG   , invoke->narg),
+                         MSG_END);
+
+    for (i = 0, arg = invoke->args; i < invoke->narg; i++, arg++) {
+        switch (arg->type) {
+        case MRP_DOMCTL_STRING:
+            if (!mrp_msg_append(msg, MSG_STRING(ARG, arg->str)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_DOUBLE:
+            if (!mrp_msg_append(msg, MSG_DOUBLE(ARG, arg->dbl)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_BOOL:
+            if (!mrp_msg_append(msg, MSG_BOOL(ARG, arg->bln)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_UINT8:
+            if (!mrp_msg_append(msg, MSG_UINT8(ARG, arg->s8)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_INT8:
+            if (!mrp_msg_append(msg, MSG_SINT8(ARG, arg->u8)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_UINT16:
+            if (!mrp_msg_append(msg, MSG_UINT16(ARG, arg->s16)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_INT16:
+            if (!mrp_msg_append(msg, MSG_SINT16(ARG, arg->u16)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_UINT32:
+            if (!mrp_msg_append(msg, MSG_UINT32(ARG, arg->s32)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_INT32:
+            if (!mrp_msg_append(msg, MSG_SINT32(ARG, arg->u32)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_UINT64:
+            if (!mrp_msg_append(msg, MSG_UINT64(ARG, arg->s64)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_INT64:
+            if (!mrp_msg_append(msg, MSG_SINT64(ARG, arg->u64)))
+                goto fail;
+            break;
+        default:
+            if (MRP_DOMCTL_IS_ARRAY(arg->type)) {
+                if (!mrp_msg_append(msg, MSG_ARRAY(ARG, arg->type,
+                                                   arg->size, arg->arr)))
+                    goto fail;
+            }
+            else
+                goto fail;
+            break;
+        }
+    }
+
+    return msg;
+
+ fail:
+    mrp_msg_unref(msg);
+    return NULL;
+}
+
+
+msg_t *msg_decode_invoke(mrp_msg_t *msg)
+{
+    invoke_msg_t     *invoke;
+    void             *it;
+    uint16_t          tag, type;
+    mrp_msg_value_t   val;
+    mrp_domctl_arg_t *arg;
+    uint32_t          i;
+    size_t            size;
+
+    mrp_debug_code({
+            mrp_debug("got domain invoke request:");
+            mrp_msg_dump(msg, stdout); });
+
+    it     = NULL;
+    invoke = mrp_allocz(sizeof(*invoke));
+
+    if (invoke == NULL)
+        goto fail;
+
+    invoke->type = MSG_TYPE_INVOKE;
+
+    if (!mrp_msg_iterate_get(msg, &it,
+                             MSG_UINT32(MSGSEQ, &invoke->seq),
+                             MSG_STRING(METHOD, &invoke->name),
+                             MSG_BOOL  (NORET , &invoke->noret),
+                             MSG_UINT32(NARG  , &invoke->narg),
+                             MSG_END))
+        goto fail;
+
+
+
+    if (invoke->narg > 0)
+        invoke->args = mrp_allocz(invoke->narg * sizeof(invoke->args[0]));
+
+    if (invoke->args == NULL && invoke->narg > 0)
+        goto fail;
+
+    for (i = 0, arg = invoke->args; i < invoke->narg; i++, arg++) {
+        if (!mrp_msg_iterate(msg, &it, &tag, &type, &val, &size))
+            goto fail;
+
+        arg->type = type;
+
+        switch (type) {
+        case MRP_DOMCTL_STRING:  arg->str = val.str; break;
+        case MRP_DOMCTL_BOOL:    arg->bln = val.bln; break;
+        case MRP_DOMCTL_UINT8:   arg->u8  = val.u8;  break;
+        case MRP_DOMCTL_INT8:    arg->s8  = val.s8;  break;
+        case MRP_DOMCTL_UINT16:  arg->u16 = val.u16; break;
+        case MRP_DOMCTL_INT16:   arg->s16 = val.s16; break;
+        case MRP_DOMCTL_UINT32:  arg->u32 = val.u32; break;
+        case MRP_DOMCTL_INT32:   arg->s32 = val.s32; break;
+        case MRP_DOMCTL_UINT64:  arg->u64 = val.u64; break;
+        case MRP_DOMCTL_INT64:   arg->s64 = val.s64; break;
+        case MRP_DOMCTL_DOUBLE:  arg->dbl = val.dbl; break;
+        default:
+            if (MRP_DOMCTL_IS_ARRAY(type)) {
+                arg->arr  = val.aany;
+                arg->size = size;
+            }
+            else
+                goto fail;
+        }
+    }
+
+    invoke->wire       = mrp_msg_ref(msg);
+    invoke->unref_wire = msg_unref_wire;
+
+    return (msg_t *)invoke;
+
+ fail:
+    msg_free_invoke((msg_t *)invoke);
+    return NULL;
+}
+
+
+void msg_free_return(msg_t *msg)
+{
+    if (msg != NULL) {
+        mrp_free(msg->ret.args);
+        mrp_free(msg);
+    }
+}
+
+
+mrp_msg_t *msg_encode_return(return_msg_t *ret)
+{
+    mrp_msg_t        *msg;
+    mrp_domctl_arg_t *arg;
+    uint32_t          i;
+
+    msg = mrp_msg_create(MSG_UINT16(MSGTYPE, MSG_TYPE_RETURN),
+                         MSG_UINT32(MSGSEQ , ret->seq),
+                         MSG_UINT32(ERROR  , ret->error),
+                         MSG_SINT32(RETVAL , ret->retval),
+                         MSG_UINT32(NARG   , ret->narg),
+                         MSG_END);
+
+    for (i = 0, arg = ret->args; i < ret->narg; i++, arg++) {
+        switch (arg->type) {
+        case MRP_DOMCTL_STRING:
+            if (!mrp_msg_append(msg, MSG_STRING(ARG, arg->str)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_DOUBLE:
+            if (!mrp_msg_append(msg, MSG_DOUBLE(ARG, arg->dbl)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_BOOL:
+            if (!mrp_msg_append(msg, MSG_BOOL(ARG, arg->bln)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_UINT8:
+            if (!mrp_msg_append(msg, MSG_UINT8(ARG, arg->s8)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_INT8:
+            if (!mrp_msg_append(msg, MSG_SINT8(ARG, arg->u8)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_UINT16:
+            if (!mrp_msg_append(msg, MSG_UINT16(ARG, arg->s16)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_INT16:
+            if (!mrp_msg_append(msg, MSG_SINT16(ARG, arg->u16)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_UINT32:
+            if (!mrp_msg_append(msg, MSG_UINT32(ARG, arg->s32)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_INT32:
+            if (!mrp_msg_append(msg, MSG_SINT32(ARG, arg->u32)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_UINT64:
+            if (!mrp_msg_append(msg, MSG_UINT64(ARG, arg->s64)))
+                goto fail;
+            break;
+        case MRP_DOMCTL_INT64:
+            if (!mrp_msg_append(msg, MSG_SINT64(ARG, arg->u64)))
+                goto fail;
+            break;
+        default:
+            if (MRP_DOMCTL_IS_ARRAY(arg->type)) {
+                if (!mrp_msg_append(msg, MSG_ARRAY(ARG, arg->type,
+                                                   arg->size, arg->arr)))
+                    goto fail;
+            }
+            else
+                goto fail;
+            break;
+        }
+    }
+
+    return msg;
+
+ fail:
+    mrp_msg_unref(msg);
+    return NULL;
+}
+
+
+msg_t *msg_decode_return(mrp_msg_t *msg)
+{
+    return_msg_t     *ret;
+    void             *it;
+    uint16_t          tag, type;
+    mrp_msg_value_t   val;
+    mrp_domctl_arg_t *arg;
+    uint32_t          i;
+    size_t            size;
+
+    mrp_debug_code({
+            mrp_debug("got domain return (invoke reply):");
+            mrp_msg_dump(msg, stdout); });
+
+    it  = NULL;
+    ret = mrp_allocz(sizeof(*ret));
+
+    if (ret == NULL)
+        goto fail;
+
+    ret->type = MSG_TYPE_RETURN;
+
+    if (!mrp_msg_iterate_get(msg, &it,
+                             MSG_UINT32(MSGSEQ, &ret->seq),
+                             MSG_UINT32(ERROR , &ret->error),
+                             MSG_SINT32(RETVAL, &ret->retval),
+                             MSG_UINT32(NARG  , &ret->narg),
+                             MSG_END))
+        goto fail;
+
+    if (ret->narg > 0)
+        ret->args = mrp_allocz(ret->narg * sizeof(ret->args[0]));
+
+    if (ret->args == NULL && ret->narg > 0)
+        goto fail;
+
+    for (i = 0, arg = ret->args; i < ret->narg; i++, arg++) {
+        if (!mrp_msg_iterate(msg, &it, &tag, &type, &val, &size))
+            goto fail;
+
+        arg->type = type;
+
+        switch (type) {
+        case MRP_DOMCTL_STRING:  arg->str = val.str; break;
+        case MRP_DOMCTL_BOOL:    arg->bln = val.bln; break;
+        case MRP_DOMCTL_UINT8:   arg->u8  = val.u8;  break;
+        case MRP_DOMCTL_INT8:    arg->s8  = val.s8;  break;
+        case MRP_DOMCTL_UINT16:  arg->u16 = val.u16; break;
+        case MRP_DOMCTL_INT16:   arg->s16 = val.s16; break;
+        case MRP_DOMCTL_UINT32:  arg->u32 = val.u32; break;
+        case MRP_DOMCTL_INT32:   arg->s32 = val.s32; break;
+        case MRP_DOMCTL_UINT64:  arg->u64 = val.u64; break;
+        case MRP_DOMCTL_INT64:   arg->s64 = val.s64; break;
+        case MRP_DOMCTL_DOUBLE:  arg->dbl = val.dbl; break;
+        default:
+            if (MRP_DOMCTL_IS_ARRAY(type)) {
+                arg->arr  = val.aany;
+                arg->size = size;
+            }
+            else
+                goto fail;
+        }
+    }
+
+    ret->wire       = mrp_msg_ref(msg);
+    ret->unref_wire = msg_unref_wire;
+
+    return (msg_t *)ret;
+
+ fail:
+    msg_free_return((msg_t *)ret);
+    return NULL;
+}
+
+
 msg_t *msg_decode_message(mrp_msg_t *msg)
 {
     uint16_t type;
@@ -754,6 +1082,8 @@ msg_t *msg_decode_message(mrp_msg_t *msg)
         case MSG_TYPE_NOTIFY:     return msg_decode_notify(msg);
         case MSG_TYPE_ACK:        return msg_decode_ack(msg);
         case MSG_TYPE_NAK:        return msg_decode_nak(msg);
+        case MSG_TYPE_INVOKE:     return msg_decode_invoke(msg);
+        case MSG_TYPE_RETURN:     return msg_decode_return(msg);
         default:                  break;
         }
     }
@@ -771,6 +1101,8 @@ mrp_msg_t *msg_encode_message(msg_t *msg)
     case MSG_TYPE_NOTIFY:     return msg_encode_notify(&msg->notify);
     case MSG_TYPE_ACK:        return msg_encode_ack(&msg->ack);
     case MSG_TYPE_NAK:        return msg_encode_nak(&msg->nak);
+    case MSG_TYPE_INVOKE:     return msg_encode_invoke(&msg->invoke);
+    case MSG_TYPE_RETURN:     return msg_encode_return(&msg->ret);
     default:                  return NULL;
     }
 }
@@ -786,6 +1118,8 @@ void msg_free_message(msg_t *msg)
         case MSG_TYPE_NOTIFY:     msg_free_notify(msg);     break;
         case MSG_TYPE_ACK:        msg_free_ack(msg);        break;
         case MSG_TYPE_NAK:        msg_free_nak(msg);        break;
+        case MSG_TYPE_INVOKE:     msg_free_invoke(msg);     break;
+        case MSG_TYPE_RETURN:     msg_free_return(msg);     break;
         default:                                            break;
         }
     }
index d6de475..4f48eba 100644 (file)
@@ -44,6 +44,8 @@ typedef enum {
     MSG_TYPE_NOTIFY,
     MSG_TYPE_ACK,
     MSG_TYPE_NAK,
+    MSG_TYPE_INVOKE,
+    MSG_TYPE_RETURN,
 } msg_type_t;
 
 typedef enum {
@@ -72,16 +74,32 @@ typedef enum {
     MSGTAG_NROW    = 0x6,            /* number of table rows */
     MSGTAG_NCOL    = 0x7,            /* number of columns in a row */
     MSGTAG_DATA    = 0x8,            /* a data column */
+
+    /* fixed tags in invoke and return messages */
+    MSGTAG_METHOD  = 0x3,            /* method name */
+    MSGTAG_NORET   = 0x4,            /* whether return values ignored */
+    MSGTAG_NARG    = 0x5,            /* number of arguments */
+    MSGTAG_ARG     = 0x6,            /* argument */
+    MSGTAG_ERROR   = 0x7,            /* invocation error */
+    MSGTAG_RETVAL  = 0x8,            /* invocation return value */
 } msgtag_t;
 
 
+#define MSG_UINT8(tag, val) MRP_MSG_TAG_UINT8(MSGTAG_##tag, val)
+#define MSG_SINT8(tag, val) MRP_MSG_TAG_SINT8(MSGTAG_##tag, val)
 #define MSG_UINT16(tag, val) MRP_MSG_TAG_UINT16(MSGTAG_##tag, val)
 #define MSG_SINT16(tag, val) MRP_MSG_TAG_SINT16(MSGTAG_##tag, val)
 #define MSG_UINT32(tag, val) MRP_MSG_TAG_UINT32(MSGTAG_##tag, val)
 #define MSG_SINT32(tag, val) MRP_MSG_TAG_SINT32(MSGTAG_##tag, val)
+#define MSG_UINT64(tag, val) MRP_MSG_TAG_UINT64(MSGTAG_##tag, val)
+#define MSG_SINT64(tag, val) MRP_MSG_TAG_SINT64(MSGTAG_##tag, val)
 #define MSG_DOUBLE(tag, val) MRP_MSG_TAG_DOUBLE(MSGTAG_##tag, val)
 #define MSG_STRING(tag, val) MRP_MSG_TAG_STRING(MSGTAG_##tag, val)
+#define MSG_BOOL(tag, val) MRP_MSG_TAG_BOOL(MSGTAG_##tag, val)
 #define MSG_ANY(tag, typep, valp) MRP_MSG_TAG_ANY(MSGTAG_##tag, typep, valp)
+#define MSG_ARRAY(tag, type, size, arr) \
+    MRP_MSG_TAGGED(MSGTAG_##tag, type, size, arr)
+
 #define MSG_END MRP_MSG_END
 
 #define COMMON_MSG_FIELDS                /* common message fields */      \
@@ -132,6 +150,25 @@ typedef struct {
     const char *msg;
 } nak_msg_t;
 
+
+typedef struct {
+    COMMON_MSG_FIELDS;
+    const char       *name;
+    int               noret;
+    uint32_t          narg;
+    mrp_domctl_arg_t *args;
+} invoke_msg_t;
+
+
+typedef struct {
+    COMMON_MSG_FIELDS;
+    uint32_t          error;
+    int32_t           retval;
+    uint32_t          narg;
+    mrp_domctl_arg_t *args;
+} return_msg_t;
+
+
 typedef struct {
     COMMON_MSG_FIELDS;
 } any_msg_t;
@@ -145,6 +182,8 @@ union msg_u {
     notify_msg_t     notify;
     ack_msg_t        ack;
     nak_msg_t        nak;
+    invoke_msg_t     invoke;
+    return_msg_t     ret;
 };
 
 
index 51bb7db..5477e1e 100644 (file)
 #include "proxy.h"
 
 
+/*
+ * a pending proxied invocation
+ */
+
+typedef struct {
+    mrp_list_hook_t         hook;        /* to pending list */
+    uint32_t                id;          /* request id */
+    mrp_domain_return_cb_t  cb;          /* return callback */
+    void                   *user_data;   /* opaque callback data */
+} pending_t;
+
+static void purge_pending(pep_proxy_t *proxy);
+
+
 int init_proxies(pdp_t *pdp)
 {
     mrp_list_init(&pdp->proxies);
@@ -63,8 +77,10 @@ pep_proxy_t *create_proxy(pdp_t *pdp)
     if (proxy != NULL) {
         mrp_list_init(&proxy->hook);
         mrp_list_init(&proxy->watches);
+        mrp_list_init(&proxy->pending);
 
-        proxy->pdp = pdp;
+        proxy->pdp   = pdp;
+        proxy->seqno = 1;
 
         mrp_list_append(&pdp->proxies, &proxy->hook);
     }
@@ -85,6 +101,8 @@ void destroy_proxy(pep_proxy_t *proxy)
 
         destroy_proxy_watches(proxy);
 
+        purge_pending(proxy);
+
         mrp_free(proxy);
     }
 }
@@ -155,3 +173,82 @@ int unregister_proxy(pep_proxy_t *proxy)
 
     return TRUE;
 }
+
+
+pep_proxy_t *find_proxy(pdp_t *pdp, const char *name)
+{
+    mrp_list_hook_t *p, *n;
+    pep_proxy_t     *proxy;
+
+    mrp_list_foreach(&pdp->proxies, p, n) {
+        proxy = mrp_list_entry(p, typeof(*proxy), hook);
+
+        if (!strcmp(proxy->name, name))
+            return proxy;
+    }
+
+    return NULL;
+}
+
+
+uint32_t proxy_queue_pending(pep_proxy_t *proxy,
+                             mrp_domain_return_cb_t return_cb, void *user_data)
+{
+    pending_t *pending;
+
+    if (return_cb == NULL)
+        return proxy->seqno++;
+
+    pending = mrp_allocz(sizeof(*pending));
+
+    if (pending == NULL)
+        return 0;
+
+    mrp_list_init(&pending->hook);
+
+    pending->id        = proxy->seqno++;
+    pending->cb        = return_cb;
+    pending->user_data = user_data;
+
+    mrp_list_append(&proxy->pending, &pending->hook);
+
+    return pending->id;
+}
+
+
+int proxy_dequeue_pending(pep_proxy_t *proxy, uint32_t id,
+                          mrp_domain_return_cb_t *cbp, void **user_datap)
+{
+    mrp_list_hook_t *p, *n;
+    pending_t       *pending;
+
+    mrp_list_foreach(&proxy->pending, p, n) {
+        pending = mrp_list_entry(p, typeof(*pending), hook);
+
+        if (pending->id == id) {
+            mrp_list_delete(&pending->hook);
+            *cbp        = pending->cb;
+            *user_datap = pending->user_data;
+
+            mrp_free(pending);
+
+            return TRUE;
+        }
+    }
+
+    return FALSE;
+}
+
+
+static void purge_pending(pep_proxy_t *proxy)
+{
+    mrp_list_hook_t *p, *n;
+    pending_t       *pending;
+
+    mrp_list_foreach(&proxy->pending, p, n) {
+        pending = mrp_list_entry(p, typeof(*pending), hook);
+
+        mrp_list_delete(&pending->hook);
+        mrp_free(pending);
+    }
+}
index ed00794..42e03e8 100644 (file)
@@ -30,6 +30,8 @@
 #ifndef __MURPHY_DOMAIN_CONTROL_PROXY_H__
 #define __MURPHY_DOMAIN_CONTROL_PROXY_H__
 
+#include <murphy/core/domain.h>
+
 #include "domain-control-types.h"
 
 int init_proxies(pdp_t *pdp);
@@ -44,4 +46,11 @@ int register_proxy(pep_proxy_t *proxy, char *name,
                    int *error, const char **errmsg);
 int unregister_proxy(pep_proxy_t *proxy);
 
+pep_proxy_t *find_proxy(pdp_t *pdp, const char *name);
+
+uint32_t proxy_queue_pending(pep_proxy_t *proxy,
+                             mrp_domain_return_cb_t return_cb, void *user_data);
+int proxy_dequeue_pending(pep_proxy_t *proxy, uint32_t id,
+                          mrp_domain_return_cb_t *cb, void **user_datap);
+
 #endif /* __MURPHY_DOMAIN_CONTROL_PROXY_H__ */