plugins: added a prototype decision plugin.
authorKrisztian Litkey <krisztian.litkey@intel.com>
Thu, 4 Oct 2012 06:47:48 +0000 (09:47 +0300)
committerKrisztian Litkey <krisztian.litkey@intel.com>
Fri, 26 Oct 2012 16:16:00 +0000 (19:16 +0300)
The decision-proto plugin provides a client library through which
enfocement points can publish their own data to the murphy DB and
also subscribe for data published by ohter entities (not just
enforement points) to the DB.

19 files changed:
configure.ac
src/Makefile.am
src/plugins/decision-proto/Makefile [new file with mode: 0644]
src/plugins/decision-proto/client.c [new file with mode: 0644]
src/plugins/decision-proto/client.h [new file with mode: 0644]
src/plugins/decision-proto/decision-types.h [new file with mode: 0644]
src/plugins/decision-proto/decision.c [new file with mode: 0644]
src/plugins/decision-proto/decision.h [new file with mode: 0644]
src/plugins/decision-proto/message.c [new file with mode: 0644]
src/plugins/decision-proto/message.h [new file with mode: 0644]
src/plugins/decision-proto/notify.c [new file with mode: 0644]
src/plugins/decision-proto/notify.h [new file with mode: 0644]
src/plugins/decision-proto/plugin-decision.c [new file with mode: 0644]
src/plugins/decision-proto/proxy.c [new file with mode: 0644]
src/plugins/decision-proto/proxy.h [new file with mode: 0644]
src/plugins/decision-proto/table-common.c [new file with mode: 0644]
src/plugins/decision-proto/table.c [new file with mode: 0644]
src/plugins/decision-proto/table.h [new file with mode: 0644]
src/plugins/decision-proto/test-client.c [new file with mode: 0644]

index eb3d290..17d9c8d 100644 (file)
@@ -255,8 +255,10 @@ AC_ARG_WITH(dynamic-plugins,
             [  --with-dynamic-plugins=<plugin-list>  specify which plugins compile as DSOs],
             [dynamic_plugins=$withval],[dynamic_plugins=none])
 
-all_plugins=$(ls src/plugins/*.c 2>/dev/null | \
-              sed 's#src/plugins/plugin-##g;s#\.c$##g' | tr '\n' ' ')
+all_plugins=$(find src/plugins/. -name plugin-*.c 2>/dev/null | \
+              sed 's#^.*/plugin-##g;s#\.c$##g' | tr '\n' ' ')
+
+#echo "all plugins: [$all_plugins]"
 
 case dynamic_plugins in
     all)  dynamic_plugins="$all_plugins";;
@@ -313,17 +315,19 @@ function check_if_internal() {
     return 1
 }
 
-AM_CONDITIONAL(DISABLED_PLUGIN_TEST,    [check_if_disabled test])
-AM_CONDITIONAL(DISABLED_PLUGIN_DBUS,    [check_if_disabled dbus])
-AM_CONDITIONAL(DISABLED_PLUGIN_GLIB,    [check_if_disabled glib])
-AM_CONDITIONAL(DISABLED_PLUGIN_CONSOLE, [check_if_disabled console])
+AM_CONDITIONAL(DISABLED_PLUGIN_TEST,     [check_if_disabled test])
+AM_CONDITIONAL(DISABLED_PLUGIN_DBUS,     [check_if_disabled dbus])
+AM_CONDITIONAL(DISABLED_PLUGIN_GLIB,     [check_if_disabled glib])
+AM_CONDITIONAL(DISABLED_PLUGIN_CONSOLE,  [check_if_disabled console])
 AM_CONDITIONAL(DISABLED_PLUGIN_SIGNALLING, [check_if_disabled signalling])
+AM_CONDITIONAL(DISABLED_PLUGIN_DECISION, [check_if_disabled decision])
 
 AM_CONDITIONAL(BUILTIN_PLUGIN_TEST,     [check_if_internal test])
 AM_CONDITIONAL(BUILTIN_PLUGIN_DBUS,     [check_if_internal dbus])
 AM_CONDITIONAL(BUILTIN_PLUGIN_GLIB,     [check_if_internal glib])
 AM_CONDITIONAL(BUILTIN_PLUGIN_CONSOLE,  [check_if_internal console])
 AM_CONDITIONAL(BUILTIN_PLUGIN_SIGNALLING, [check_if_internal signalling])
+AM_CONDITIONAL(BUILTIN_PLUGIN_DECISION, [check_if_internal decision])
 
 # Check for Check (unit test framework).
 PKG_CHECK_MODULES(CHECK, 
index 9777bed..5cc7387 100644 (file)
@@ -470,6 +470,8 @@ BUILTIN_PLUGINS    =
 BUILTIN_CFLAGS     = -D__MURPHY_BUILTIN_PLUGIN__ $(AM_CFLAGS)
 BUILTIN_LIBS       =
 
+LINKEDIN_PLUGINS   =
+
 plugin_LTLIBRARIES = 
 plugindir          = $(libdir)/murphy/plugins
 
@@ -632,7 +634,6 @@ resource_client_SOURCES = plugins/resource-native/resource-client.c
 resource_client_CFLAGS  = $(AM_CFLAGS) $(BUILTIN_CFLAGS)
 resource_client_LDADD   = $(BUILTIN_LIBS) libmurphy-common.la
 
-
 # debug file:line-function mapping generation
 plugin-resource-native-func-info.c: $(PLUGIN_RESOURCE_NATIVE_REGULAR_SOURCES)
        $(QUIET_GEN)$(top_builddir)/build-aux/gen-debug-table -o $@ $^
@@ -640,6 +641,88 @@ plugin-resource-native-func-info.c: $(PLUGIN_RESOURCE_NATIVE_REGULAR_SOURCES)
 clean-func-infos::
        -rm plugin-resource-native-func-info.c
 
+# decision plugin
+DECISION_PLUGIN_SOURCES = plugins/decision-proto/plugin-decision.c \
+                         plugins/decision-proto/decision.c          \
+                         plugins/decision-proto/proxy.c             \
+                         plugins/decision-proto/table.c             \
+                         plugins/decision-proto/message.c         \
+                         plugins/decision-proto/notify.c
+
+DECISION_PLUGIN_CFLAGS  =
+DECISION_PLUGIN_LIBS    =
+
+if !DISABLED_PLUGIN_DECISION
+if BUILTIN_PLUGIN_DECISION
+LINKEDIN_PLUGINS       += libmurphy-plugin-decision.la
+lib_LTLIBRARIES        += libmurphy-plugin-decision.la
+DECISION_PLUGIN_LOADER  = linkedin-decision-loader.c
+DECISION_PLUGIN_CFLAGS += $(BUILTIN_CFLAGS)
+
+
+libmurphy_plugin_decision_ladir =                              \
+               $(includedir)/murphy/decision-proto
+
+libmurphy_plugin_decision_la_SOURCES =                         \
+               $(DECISION_PLUGIN_SOURCES)                      \
+               $(DECISION_PLUGIN_LOADER)
+
+libmurphy_plugin_decision_la_CFLAGS =                          \
+               $(DECISION_PLUGIN_CFLAGS)                       \
+               $(AM_CFLAGS)
+
+libmurphy_plugin_decision_la_LDFLAGS =                         \
+               -Wl,-version-script=linker-script.decision      \
+               -version-info @MURPHY_VERSION_INFO@
+
+libmurphy_plugin_decision_la_LIBADD =          \
+               libmurphy-core.la               \
+               libmurphy-common.la             \
+               murphy-db/mql/libmql.la         \
+               murphy-db/mqi/libmqi.la         \
+               murphy-db/mdb/libmdb.la
+
+libmurphy_plugin_decision_la_DEPENDENCIES =    \
+               linker-script.decision          \
+               libmurphy-core.la               \
+               libmurphy-common.la             \
+               murphy-db/mql/libmql.la         \
+               murphy-db/mqi/libmqi.la         \
+               murphy-db/mdb/libmdb.la
+
+# linkedin decision plugin linker script generation
+linker-script.decision: $(DECISION_PLUGIN_LOADER:%.c=%.h)
+       $(QUIET_GEN)$(top_builddir)/build-aux/gen-linker-script -q -o $@ $^
+
+clean-linker-script::
+       -rm -f linker-script.decision
+else
+plugin_decision_la_SOURCES = $(DECISION_PLUGIN_SOURCES)
+plugin_decision_la_CFLAGS  = $(DECISION_PLUGIN_CFLAGS) \
+                            $(MURPHY_CFLAGS) $(AM_CFLAGS)
+plugin_decision_la_LDFLAGS = -module -avoid-version
+plugin_decision_la_LIBADD  = $(DECISION_PLUGIN_LIBS)
+
+lib_LTLIBRARIES          += libmurphy-pep.la
+libmurphy_pep_la_SOURCES  = plugins/decision-proto/client.c \
+                           plugins/decision-proto/table-common.c  \
+                           plugins/decision-proto/message.c
+libmurphy_pep_la_CFLAGS   =
+libmurphy_pep_la_LIBADD   = libmurphy-common.la     \
+                           murphy-db/mql/libmql.la \
+                           murphy-db/mqi/libmqi.la \
+                           murphy-db/mdb/libmdb.la
+
+
+# enforcement point test client
+bin_PROGRAMS += test-client
+
+test_client_SOURCES = plugins/decision-proto/test-client.c
+test_client_CFLAGS  = $(AM_CFLAGS)
+test_client_LDADD   = libmurphy-pep.la    \
+                     libmurphy-common.la \
+                     -lreadline
+endif
 
 ###################################
 # murphy daemon
@@ -653,7 +736,8 @@ config_DATA   = daemon/murphy.conf
 murphyd_SOURCES =                      \
                daemon/daemon.c         \
                daemon/config.c         \
-               $(BUILTIN_PLUGINS)
+               $(BUILTIN_PLUGINS)      \
+               load-linkedin-plugins.c
 
 murphyd_CFLAGS  =                      \
                $(AM_CFLAGS)            \
@@ -661,6 +745,7 @@ murphyd_CFLAGS  =                   \
 
 murphyd_LDADD  =                       \
                $(BUILTIN_LIBS)         \
+               $(LINKEDIN_PLUGINS)     \
                libmurphy-resource.la   \
                libmurphy-resolver.la   \
                libmurphy-core.la       \
@@ -668,6 +753,30 @@ murphyd_LDADD  =                   \
 
 murphyd_LDFLAGS = -rdynamic
 
+
+
+###################################
+# linkedin (DSO) loader generation
+#
+
+linkedin-%-loader.c:
+       $(QUIET_GEN)$(top_builddir)/build-aux/gen-linkedin-loader \
+           -p $(shell echo $@ | \
+               sed 's/linkedin-//g;s/-loader.c//g') -o $@
+
+linkedin-%-loader.h:
+       $(QUIET_GEN)$(top_builddir)/build-aux/gen-linkedin-loader \
+           -p $(shell echo $@ | \
+               sed 's/linkedin-//g;s/-loader.h//g') -o $@
+
+load-linkedin-plugins.c:
+       $(QUIET_GEN)$(top_builddir)/build-aux/gen-linkedin-loader \
+           -o $@ $(shell echo $(LINKEDIN_PLUGINS) | \
+                    sed 's/.*-//g;s/\.[^\.]*$$//g')
+
+clean-local::
+       -rm -f linkedin-*-loader.[hc] load-linkedin-plugins.c
+
 ###################################
 # murphy console client
 #
diff --git a/src/plugins/decision-proto/Makefile b/src/plugins/decision-proto/Makefile
new file mode 100644 (file)
index 0000000..2c0a593
--- /dev/null
@@ -0,0 +1,7 @@
+ifneq ($(strip $(MAKECMDGOALS)),)
+%:
+       $(MAKE) -C .. $(MAKECMDGOALS)
+else
+all:
+       $(MAKE) -C .. all
+endif
diff --git a/src/plugins/decision-proto/client.c b/src/plugins/decision-proto/client.c
new file mode 100644 (file)
index 0000000..8393c73
--- /dev/null
@@ -0,0 +1,466 @@
+#include <errno.h>
+
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+#include <murphy/common/mainloop.h>
+#include <murphy/common/transport.h>
+
+#include "decision-types.h"
+#include "table.h"
+#include "message.h"
+#include "client.h"
+
+
+/*
+ * mark an enforcement point busy (typically while executing a callback)
+ */
+
+#define PEP_MARK_BUSY(pep, ...) do {                \
+        (pep)->busy++;                              \
+        __VA_ARGS__                                 \
+        (pep)->busy--;                              \
+        check_destroyed(pep);                       \
+    } while (0)
+
+
+/*
+ * a pending request
+ */
+
+typedef struct {
+    mrp_list_hook_t      hook;           /* hook to pending request queue */
+    uint32_t             seqno;          /* sequence number/request id */
+    mrp_pep_status_cb_t  cb;             /* callback to call upon completion */
+    void                *user_data;      /* opaque callback data */
+} pending_request_t;
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
+static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
+                        mrp_sockaddr_t *addr, socklen_t addrlen,
+                        void *user_data);
+static void closed_cb(mrp_transport_t *t, int error, void *user_data);
+
+
+static int queue_pending(mrp_pep_t *pep, uint32_t seq,
+                         mrp_pep_status_cb_t cb, void *user_data);
+static int notify_pending(mrp_pep_t *pep, uint32_t seq, int error,
+                          const char *msg);
+static void purge_pending(mrp_pep_t *pep);
+
+
+
+
+mrp_pep_t *mrp_pep_create(const char *name, mrp_mainloop_t *ml,
+                          mrp_pep_table_t *owned_tables, int nowned,
+                          mrp_pep_table_t *watched_tables, int nwatched,
+                          mrp_pep_connect_cb_t connect_cb,
+                          mrp_pep_data_cb_t data_cb, void *user_data)
+{
+    mrp_pep_t *pep;
+
+    pep = mrp_allocz(sizeof(*pep));
+
+    if (pep != NULL) {
+        mrp_list_init(&pep->pending);
+        pep->ml = ml;
+
+        pep->name    = mrp_strdup(name);
+        pep->owned   = mrp_allocz_array(typeof(*pep->owned), nowned);
+        pep->watched = mrp_allocz_array(typeof(*pep->watched), nwatched);
+
+        if (pep->name != NULL && pep->owned != NULL && pep->watched != NULL) {
+            if (copy_pep_tables(owned_tables, pep->owned, nowned)) {
+                pep->nowned = nowned;
+                if (copy_pep_tables(watched_tables, pep->watched, nwatched)) {
+                    pep->nwatched   = nwatched;
+                    pep->connect_cb = connect_cb;
+                    pep->data_cb    = data_cb;
+                    pep->user_data  = user_data;
+                    pep->seqno      = 1;
+
+                    return pep;
+                }
+            }
+        }
+
+        mrp_pep_destroy(pep);
+    }
+
+    return NULL;
+}
+
+
+static void destroy_pep(mrp_pep_t *pep)
+{
+    mrp_free(pep->name);
+
+    free_pep_tables(pep->owned, pep->nowned);
+    free_pep_tables(pep->watched, pep->nwatched);
+
+    purge_pending(pep);
+
+    mrp_free(pep);
+}
+
+
+static inline void check_destroyed(mrp_pep_t *pep)
+{
+    if (pep->destroyed && pep->busy <= 0) {
+        destroy_pep(pep);
+    }
+}
+
+
+void mrp_pep_destroy(mrp_pep_t *pep)
+{
+    if (pep != NULL) {
+        mrp_pep_disconnect(pep);
+
+        if (pep->busy <= 0)
+            destroy_pep(pep);
+        else
+            pep->destroyed = TRUE;
+    }
+}
+
+
+static void notify_disconnect(mrp_pep_t *pep, uint32_t errcode,
+                              const char *errmsg)
+{
+    PEP_MARK_BUSY(pep, {
+            pep->connected = FALSE;
+            pep->connect_cb(pep, FALSE, errcode, errmsg, pep->user_data);
+        });
+}
+
+
+static void notify_connect(mrp_pep_t *pep)
+{
+    PEP_MARK_BUSY(pep, {
+            pep->connected = TRUE;
+            pep->connect_cb(pep, TRUE, 0, NULL, pep->user_data);
+        });
+}
+
+
+static int pep_register(mrp_pep_t *pep)
+{
+    mrp_msg_t *msg;
+    int        success;
+
+    msg = create_register_message(pep);
+
+    if (msg != NULL) {
+        success = mrp_transport_send(pep->t, msg);
+        mrp_msg_unref(msg);
+    }
+    else
+        success = FALSE;
+
+    return success;
+}
+
+
+int mrp_pep_connect(mrp_pep_t *pep, const char *address)
+{
+    static mrp_transport_evt_t evt = {
+        .closed      = closed_cb,
+        .recvmsg     = recv_cb,
+        .recvmsgfrom = recvfrom_cb,
+    };
+
+    mrp_sockaddr_t  addr;
+    socklen_t       addrlen;
+    const char     *type;
+
+    if (pep == NULL)
+        return FALSE;
+
+    addrlen = mrp_transport_resolve(NULL, address, &addr, sizeof(addr), &type);
+
+    if (addrlen > 0) {
+        pep->t = mrp_transport_create(pep->ml, type, &evt, pep, 0);
+
+        if (pep->t != NULL) {
+            if (mrp_transport_connect(pep->t, &addr, addrlen))
+                if (pep_register(pep))
+                    return TRUE;
+
+            mrp_transport_destroy(pep->t);
+            pep->t = NULL;
+        }
+    }
+
+    return FALSE;
+}
+
+
+void mrp_pep_disconnect(mrp_pep_t *pep)
+{
+    if (pep->t != NULL) {
+        mrp_transport_destroy(pep->t);
+        pep->t         = NULL;
+        pep->connected = FALSE;
+    }
+}
+
+
+int mrp_pep_set_data(mrp_pep_t *pep, mrp_pep_data_t *data, int ntable,
+                     mrp_pep_status_cb_t cb, void *user_data)
+{
+    mrp_msg_t *msg;
+    uint32_t   seq = pep->seqno++;
+    int        success, i;
+
+    if (!pep->connected)
+        return FALSE;
+
+    for (i = 0; i < ntable; i++) {
+        if (data[i].id < 0 || data[i].id >= pep->nowned)
+            return FALSE;
+
+        data[i].coldefs = pep->owned[data[i].id].columns;
+        data[i].ncolumn = pep->owned[data[i].id].ncolumn;
+    }
+
+    msg = create_set_message(seq, data, ntable);
+
+    if (msg != NULL) {
+        success = mrp_transport_send(pep->t, msg);
+        mrp_msg_unref(msg);
+
+        if (success)
+            queue_pending(pep, seq, cb, user_data);
+
+        return success;
+    }
+    else
+        return FALSE;
+}
+
+
+static void process_ack(mrp_pep_t *pep, uint32_t seq)
+{
+    if (seq != 0)
+        notify_pending(pep, seq, 0, NULL);
+    else
+        notify_connect(pep);
+}
+
+
+static void process_nak(mrp_pep_t *pep, uint32_t seq, int32_t err,
+                        const char *msg)
+{
+    if (seq != 0)
+        notify_pending(pep, seq, err, msg);
+    else
+        notify_disconnect(pep, err, msg);
+}
+
+
+static void process_notify(mrp_pep_t *pep, mrp_msg_t *msg, uint32_t seq,
+                           int ntable, int ncolumn)
+{
+    mrp_pep_table_t  *tbl;
+    mrp_pep_data_t    data[ntable], *d;
+    mrp_pep_value_t   values[ncolumn], *v;
+    mqi_column_def_t *cols;
+    void             *it;
+    int               ncol, i, j;
+    uint16_t          tblid;
+    uint16_t          nrow;
+
+    it = NULL;
+    d  = data;
+    v  = values;
+
+    for (i = 0; i < ntable; i++) {
+        if (!mrp_msg_iterate_get(msg, &it,
+                                 MRP_PEPMSG_UINT16(TBLID, &tblid),
+                                 MRP_PEPMSG_UINT16(NROW , &nrow ),
+                                 MRP_MSG_END))
+            return;
+
+        if (tblid >= pep->nwatched)
+            return;
+
+        tbl  = pep->watched + tblid;
+        cols = tbl->columns;
+        ncol = tbl->ncolumn;
+
+        d->id      = tblid;
+        d->columns = v;
+        d->coldefs = tbl->columns;
+        d->ncolumn = ncol;
+        d->nrow    = nrow;
+
+        if (!decode_notify_message(msg, &it, d))
+            return;
+
+        d++;
+        v += ncol * nrow;
+    }
+
+    pep->data_cb(pep, data, ntable, pep->user_data);
+}
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+    mrp_pep_t  *pep = (mrp_pep_t *)user_data;
+    uint16_t    type, nchange, ntotal;
+    uint32_t    seq;
+    int         ntable, ncolumn;
+    int32_t     error;
+    const char *errmsg;
+
+    MRP_UNUSED(t);
+
+    /*
+      mrp_log_info("Received message:");
+      mrp_msg_dump(msg, stdout);
+    */
+
+    if (!mrp_msg_get(msg,
+                     MRP_PEPMSG_UINT16(MSGTYPE, &type),
+                     MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
+                     MRP_MSG_END)) {
+        mrp_pep_disconnect(pep);
+        notify_disconnect(pep, EINVAL, "malformed message from client");
+        return;
+    }
+
+    switch (type) {
+    case MRP_PEPMSG_ACK:
+        process_ack(pep, seq);
+        break;
+
+    case MRP_PEPMSG_NAK:
+        error  = EINVAL;
+        errmsg = "request failed, unknown error";
+
+        mrp_msg_get(msg,
+                    MRP_PEPMSG_SINT32(ERRCODE, &error),
+                    MRP_PEPMSG_STRING(ERRMSG , &errmsg),
+                    MRP_MSG_END);
+
+        process_nak(pep, seq, error, errmsg);
+        break;
+
+    case MRP_PEPMSG_NOTIFY:
+        if (mrp_msg_get(msg,
+                        MRP_PEPMSG_UINT16(NCHANGE, &nchange),
+                        MRP_PEPMSG_UINT16(NTOTAL , &ntotal),
+                        MRP_MSG_END)) {
+            ntable  = nchange;
+            ncolumn = ntotal;
+
+            process_notify(pep, msg, seq, ntable, ncolumn);
+        }
+        break;
+
+    default:
+        break;
+    }
+
+}
+
+
+static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
+                        mrp_sockaddr_t *addr, socklen_t addrlen,
+                        void *user_data)
+{
+    MRP_UNUSED(addr);
+    MRP_UNUSED(addrlen);
+
+    /* XXX TODO:
+     *    This should neither be called nor be necessary to specify.
+     *    However, currently the transport layer mandates having to
+     *    give both recv and recvfrom event callbacks if no connection
+     *    event callback is given. However this is not correct because
+     *    on a client side one wants to be able to create a connection-
+     *    oriented transport without both connection and recvfrom event
+     *    callbacks. This needs to be fixed in transport by moving the
+     *    appropriate callback checks lower in the stack to the actual
+     *    transport backends.
+     */
+
+    mrp_log_error("Whoa... recvfrom called for a connected transport.");
+    exit(1);
+}
+
+
+static void closed_cb(mrp_transport_t *t, int error, void *user_data)
+{
+    mrp_pep_t *pep = (mrp_pep_t *)user_data;
+
+    MRP_UNUSED(t);
+    MRP_UNUSED(pep);
+
+    if (error)
+        notify_disconnect(pep, error, strerror(error));
+    else
+        notify_disconnect(pep, ECONNRESET, "server has closed the connection");
+}
+
+
+static int queue_pending(mrp_pep_t *pep, uint32_t seq,
+                         mrp_pep_status_cb_t cb, void *user_data)
+{
+    pending_request_t *pending;
+
+    pending = mrp_allocz(sizeof(*pending));
+
+    if (pending != NULL) {
+        mrp_list_init(&pending->hook);
+
+        pending->seqno     = seq;
+        pending->cb        = cb;
+        pending->user_data = user_data;
+
+        mrp_list_append(&pep->pending, &pending->hook);
+
+        return TRUE;
+    }
+    else
+        return FALSE;
+}
+
+
+static int notify_pending(mrp_pep_t *pep, uint32_t seq, int error,
+                          const char *msg)
+{
+    mrp_list_hook_t   *p, *n;
+    pending_request_t *pending;
+
+    mrp_list_foreach(&pep->pending, p, n) {
+        pending = mrp_list_entry(p, typeof(*pending), hook);
+
+        if (pending->seqno == seq) {
+            PEP_MARK_BUSY(pep, {
+                    pending->cb(pep, error, msg, pending->user_data);
+                    mrp_list_delete(&pending->hook);
+                    mrp_free(pending);
+                });
+
+            return TRUE;
+        }
+    }
+
+    return FALSE;
+}
+
+
+static void purge_pending(mrp_pep_t *pep)
+{
+    mrp_list_hook_t   *p, *n;
+    pending_request_t *pending;
+
+    mrp_list_foreach(&pep->pending, p, n) {
+        pending = mrp_list_entry(p, typeof(*pending), hook);
+
+        mrp_list_delete(&pending->hook);
+        mrp_free(pending);
+    }
+}
diff --git a/src/plugins/decision-proto/client.h b/src/plugins/decision-proto/client.h
new file mode 100644 (file)
index 0000000..ac8404b
--- /dev/null
@@ -0,0 +1,119 @@
+#ifndef __MURPHY_DECISION_CLIENT_H__
+#define __MURPHY_DECISION_CLIENT_H__
+
+#include <murphy-db/mqi.h>
+#include <murphy/common/mainloop.h>
+
+#define MRP_DEFAULT_PEP_ADDRESS "unxs:@murphy-decision"
+
+
+/*
+ * helper macros for defining tables
+ */
+
+#define MRP_PEP_TABLE_COLUMNS(_var, _columns...)  \
+    static mqi_column_def_t _var[] = {            \
+        _columns,                                 \
+        { NULL, mqi_unknown, 0, 0 },              \
+    }
+
+#define MRP_PEP_STRING(_name, _len, _is_idx)                            \
+    { .name = _name, .type = mqi_varchar, .length = _len, .flags = _is_idx }
+
+#define MRP_PEP_INTEGER(_name, _is_idx)                                 \
+    { .name = _name, .type = mqi_integer, .length = 0, .flags = _is_idx }
+
+#define MRP_PEP_UNSIGNED(_name, _is_idx)                                \
+    { .name = _name, .type = mqi_unsignd, .length = 0, .flags = _is_idx }
+
+#define MRP_PEP_FLOATING(_name, _is_idx)                                \
+    { .name = _name, .type = mqi_floating, .length = 0, .flags = _is_idx }
+
+#define MRP_PEP_TABLE(_name, _columns...) {      \
+        .name    = _name,                        \
+        .columns = _columns,                     \
+        .ncolumn = MRP_ARRAY_SIZE(_columns),     \
+        .id      = 0,                            \
+    }
+
+#define MRP_PEP_TABLES(var, tables...) \
+    static mrp_pep_table_t var[] = {   \
+        tables                         \
+    }
+
+/*
+ * a table definition
+ */
+
+typedef struct {
+    const char       *name;              /* table name */
+    mqi_column_def_t *columns;           /* column definitions */
+    int               ncolumn;           /* number of columns */
+    int               idx_col;           /* column to use as index */
+    int               id;                /* id used to reference this table */
+} mrp_pep_table_t;
+
+
+/*
+ * table column values
+ */
+
+typedef union {
+    const char *str;                     /* mqi_varchar */
+    uint32_t    u32;                     /* mqi_unsignd */
+    int32_t     s32;                     /* mqi_integer */
+    double      dbl;                     /* mqi_floating */
+} mrp_pep_value_t;
+
+
+/*
+ * table data
+ */
+
+typedef struct {
+    int               id;                /* table id */
+    mrp_pep_value_t  *columns;           /* table data */
+    mqi_column_def_t *coldefs;           /* column definitions */
+    int               ncolumn;           /* columns per row */
+    int               nrow;              /* number of rows */
+} mrp_pep_data_t;
+
+
+
+/** Opaque policy enforcement point type. */
+typedef struct mrp_pep_s mrp_pep_t;
+
+/** Callback type for connection state notifications. */
+typedef void (*mrp_pep_connect_cb_t)(mrp_pep_t *pep, int connection,
+                                     int errcode, const char *errmsg,
+                                     void *user_data);
+
+/** Callback type for request status notifications. */
+typedef void (*mrp_pep_status_cb_t)(mrp_pep_t *pep, int errcode,
+                                    const char *errmsg, void *user_data);
+
+/** Callback type for data change notifications. */
+typedef void (*mrp_pep_data_cb_t)(mrp_pep_t *pep, mrp_pep_data_t *tables,
+                                  int ntable, void *user_data);
+
+/** Create a new policy enforcement point. */
+mrp_pep_t *mrp_pep_create(const char *name, mrp_mainloop_t *ml,
+                          mrp_pep_table_t *owned_tables, int nowned_table,
+                          mrp_pep_table_t *watched_tables, int nwatched_table,
+                          mrp_pep_connect_cb_t connect, mrp_pep_data_cb_t data,
+                          void *user_data);
+
+/** Destroy the given policy enforcement point. */
+void mrp_pep_destroy(mrp_pep_t *pep);
+
+/** Connect and register the given client to the server at the given address. */
+int mrp_pep_connect(mrp_pep_t *pep, const char *address);
+
+/** Close the connection to the server. */
+void mrp_pep_disconnect(mrp_pep_t *pep);
+
+/** Set the content of the given tables to the given data. */
+int mrp_pep_set_data(mrp_pep_t *pep, mrp_pep_data_t *tables, int ntable,
+                     mrp_pep_status_cb_t cb, void *user_data);
+
+#endif /* __MURPHY_DECISION_CLIENT_H__ */
diff --git a/src/plugins/decision-proto/decision-types.h b/src/plugins/decision-proto/decision-types.h
new file mode 100644 (file)
index 0000000..ecfd9ff
--- /dev/null
@@ -0,0 +1,162 @@
+#ifndef __MURPHY_DECISION_TYPES_H__
+#define __MURPHY_DECISION_TYPES_H__
+
+#include <murphy/common/list.h>
+#include <murphy/common/mainloop.h>
+#include <murphy/common/transport.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/core/context.h>
+
+#include "client.h"
+
+typedef struct pep_proxy_s pep_proxy_t;
+typedef struct pep_table_s pep_table_t;
+typedef struct pep_watch_s pep_watch_t;
+typedef struct pdp_s       pdp_t;
+
+
+/*
+ * a policy enforcement point (on the client side)
+ */
+
+struct mrp_pep_s {
+    char                 *name;          /* enforcment point name */
+    mrp_mainloop_t       *ml;            /* main loop */
+    mrp_transport_t      *t;             /* transport towards murphy */
+    int                   connected;     /* transport is up */
+    mrp_pep_table_t      *owned;         /* owned tables */
+    int                   nowned;        /* number of owned tables */
+    mrp_pep_table_t      *watched;       /* watched tables */
+    int                   nwatched;      /* number of watched tables */
+    mrp_pep_connect_cb_t  connect_cb;    /* connection state change callback */
+    mrp_pep_data_cb_t     data_cb;       /* watched data change callback */
+    void                 *user_data;     /* opqaue user data for callbacks */
+    int                   busy;          /* non-zero if a callback is active */
+    int                   destroyed : 1; /* non-zero if destroy pending */
+    uint32_t              seqno;         /* request sequence number */
+    mrp_list_hook_t       pending;       /* queue of outstanding requests */
+};
+
+
+/*
+ * a table associated with or tracked by an enforcement point
+ */
+
+struct pep_table_s {
+    char              *name;             /* table name */
+    mrp_list_hook_t    hook;             /* to list of tables */
+    mqi_handle_t       h;                /* MDB table handle */
+    mqi_column_def_t  *columns;          /* column definitions */
+    mqi_column_desc_t *coldesc;          /* column descriptors */
+    int                ncolumn;          /* number of columns */
+    int                idx_col;          /* column index of index column */
+    mrp_list_hook_t    watches;          /* watches for this table */
+    uint32_t           notify_stamp;     /* current table stamp */
+    mrp_pep_value_t   *notify_data;      /* notification data */
+    int                notify_nrow;      /* number of rows to notify */
+    int                notify_fail : 1;  /* notification failure */
+    int                notify_all : 1;   /* notify all watches */
+};
+
+
+/*
+ * a table watch
+ */
+
+struct pep_watch_s {
+    pep_table_t     *table;              /* table being watched */
+    pep_proxy_t     *proxy;              /* enforcement point */
+    int              id;                 /* table id within proxy */
+    uint32_t         stamp;              /* last notified update stamp */
+    mrp_list_hook_t  tbl_hook;           /* hook to table watch list */
+    mrp_list_hook_t  pep_hook;           /* hook to proxy watch list */
+};
+
+
+/*
+ * a policy enforcement point (on the server side)
+ */
+
+struct pep_proxy_s {
+    char              *name;             /* enforcement point name */
+    pdp_t             *pdp;              /* decision point context */
+    mrp_transport_t   *t;                /* associated transport */
+    mrp_list_hook_t    hook;             /* to list of all enforcement points */
+    pep_table_t       *tables;           /* tables owned by this */
+    int                ntable;           /* number of tables */
+    mrp_list_hook_t    watches;          /* tables watched by this */
+    mrp_msg_t         *notify_msg;       /* notification being built */
+    int                notify_ntable;    /* number of changed tables */
+    int                notify_ncolumn;   /* total columns in notification */
+    int                notify_fail : 1;  /* notification failure */
+    int                notify_all : 1;   /* notify all watches */
+};
+
+
+/*
+ * policy decision point context
+ */
+
+struct pdp_s {
+    mrp_context_t   *ctx;                /* murphy context */
+    const char      *address;            /* external transport address */
+    mrp_transport_t *ext;                /* external transport */
+    mrp_list_hook_t  proxies;            /* list of enforcement points */
+    mrp_list_hook_t  tables;             /* list of tables we track */
+    mrp_htbl_t      *watched;            /* tracked tables by name */
+    mrp_deferred_t  *notify;             /* deferred notification */
+    int              notify_scheduled;   /* is notification scheduled? */
+};
+
+
+
+
+
+
+#if 0
+
+/*
+ * common table data for tracking and proxying
+ */
+
+typedef struct table_s         table_t;
+typedef struct tracked_table_s tracked_table_t;
+typedef struct proxied_table_s proxied_table_t;
+
+struct table_s {
+    char              *name;             /* table name */
+    mqi_handle_t       h;                /* table handle */
+    mqi_column_def_t  *columns;          /* column definitions */
+    mqi_column_desc_t *coldesc;          /* column descriptors */
+    int                ncolumn;          /* number of columns */
+};
+
+
+/*
+ * a tracked table
+ */
+
+struct tracked_table_s {
+    table_t         *t;                  /* actual table data */
+    mrp_list_hook_t  watches;            /* watches for this table */
+    mrp_pep_value_t *notify_data;        /* collected data for notification */
+    int              notify_nrow;        /* number of rows in notification */
+    int              notify_failed:1;    /* notification failure */
+    int              notify_all:1;       /* notify all watches */
+};
+
+
+/*
+ * a proxied table
+ */
+
+struct proxied_table_s {
+    table_t *t;                          /* actual table data */
+    int      id;                         /* id for enforcement point */
+    int      idx_col;                    /* column index of index column */
+}
+
+
+#endif
+
+#endif /* __MURPHY_DECISION_TYPES_H__ */
diff --git a/src/plugins/decision-proto/decision.c b/src/plugins/decision-proto/decision.c
new file mode 100644 (file)
index 0000000..e27b997
--- /dev/null
@@ -0,0 +1,406 @@
+#include <errno.h>
+
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+
+#include "message.h"
+#include "proxy.h"
+#include "table.h"
+#include "notify.h"
+#include "decision.h"
+
+static int create_transports(pdp_t *pdp);
+static void destroy_transports(pdp_t *pdp);
+
+pdp_t *create_decision(mrp_context_t *ctx, const char *address)
+{
+    pdp_t *pdp;
+
+    pdp = mrp_allocz(sizeof(*pdp));
+
+    if (pdp != NULL) {
+        pdp->ctx     = ctx;
+        pdp->address = address;
+
+        if (init_proxies(pdp) && init_tables(pdp) && create_transports(pdp))
+            return pdp;
+        else
+            destroy_decision(pdp);
+    }
+
+    return NULL;
+}
+
+
+void destroy_decision(pdp_t *pdp)
+{
+    if (pdp != NULL) {
+        destroy_proxies(pdp);
+        destroy_tables(pdp);
+        destroy_transports(pdp);
+
+        mrp_free(pdp);
+    }
+}
+
+
+static void notify_cb(mrp_mainloop_t *ml, mrp_deferred_t *d, void *user_data)
+{
+    pdp_t *pdp = (pdp_t *)user_data;
+
+    MRP_UNUSED(ml);
+
+    mrp_disable_deferred(d);
+    pdp->notify_scheduled = FALSE;
+    notify_table_changes(pdp);
+}
+
+
+void schedule_notification(pdp_t *pdp)
+{
+
+    if (pdp->notify == NULL)
+        pdp->notify = mrp_add_deferred(pdp->ctx->ml, notify_cb, pdp);
+
+    if (!pdp->notify_scheduled) {
+        mrp_debug("scheduling client notification");
+        mrp_enable_deferred(pdp->notify);
+    }
+}
+
+
+static void send_ack_reply(mrp_transport_t *t, uint32_t seq)
+{
+    mrp_msg_t *msg;
+
+    msg = create_ack_message(seq);
+
+    if (msg != NULL) {
+        mrp_transport_send(t, msg);
+        mrp_msg_unref(msg);
+    }
+}
+
+
+static void send_nak_reply(mrp_transport_t *t, uint32_t seq, int error,
+                           const char *errmsg)
+{
+    mrp_msg_t *msg;
+
+    msg = create_nak_message(seq, error, errmsg);
+
+    if (msg != NULL) {
+        mrp_transport_send(t, msg);
+        mrp_msg_unref(msg);
+    }
+}
+
+
+static int process_register_request(pep_proxy_t *proxy, mrp_msg_t *req,
+                                    uint32_t seq)
+{
+    mrp_transport_t *t = proxy->t;
+    char            *name;
+    uint16_t         utable, uwatch, ucolumn;
+    int              ntable, nwatch, ncolumn;
+    int              error;
+    const char      *errmsg;
+
+    if (mrp_msg_get(req,
+                    MRP_PEPMSG_STRING(NAME   , &name   ),
+                    MRP_PEPMSG_UINT16(NTABLE , &utable ),
+                    MRP_PEPMSG_UINT16(NWATCH , &uwatch ),
+                    MRP_PEPMSG_UINT16(NCOLDEF, &ucolumn),
+                    MRP_MSG_END)) {
+        mrp_pep_table_t  tables[utable], watches[uwatch];
+        mqi_column_def_t columns[ucolumn];
+
+        ntable  = utable;
+        nwatch  = uwatch;
+        ncolumn = ucolumn;
+
+        if (decode_register_message(req, tables, ntable, watches, nwatch,
+                                    columns, ncolumn)) {
+            if (register_proxy(proxy, name, tables, ntable, watches, nwatch,
+                               &error, &errmsg)) {
+                send_ack_reply(t, seq);
+                proxy->notify_all = TRUE;
+                schedule_notification(proxy->pdp);
+
+                return TRUE;
+            }
+        }
+        else
+            goto malformed;
+    }
+    else {
+    malformed:
+        error  = EINVAL;
+        errmsg = "malformed register message";
+    }
+
+    send_nak_reply(t, seq, error, errmsg);
+
+    return FALSE;
+}
+
+
+static void process_unregister_request(pep_proxy_t *proxy, uint32_t seq)
+{
+    send_ack_reply(proxy->t, seq);
+    unregister_proxy(proxy);
+}
+
+
+static void process_set_request(pep_proxy_t *proxy, mrp_msg_t *req,
+                                uint32_t seq)
+{
+#if 1
+    uint16_t    utable, uvalue, tblid, nrow;
+    int         ntable, nvalue, i;
+    int         error;
+    const char *errmsg;
+    void       *it;
+
+    it = NULL;
+
+    if (mrp_msg_iterate_get(req, &it,
+                    MRP_PEPMSG_UINT16(NCHANGE, &utable),
+                    MRP_PEPMSG_UINT16(NTOTAL , &uvalue),
+                    MRP_MSG_END)) {
+        mrp_pep_data_t  data[utable], *d;
+        mrp_pep_value_t values[uvalue], *v;
+
+        ntable = utable;
+        nvalue = uvalue;
+        d      = data;
+        v      = values;
+
+        for (i = 0; i < ntable; i++) {
+            if (!mrp_msg_iterate_get(req, &it,
+                                     MRP_PEPMSG_UINT16(TBLID, &tblid),
+                                     MRP_PEPMSG_UINT16(NROW , &nrow),
+                                     MRP_MSG_END)) {
+                error  = EINVAL;
+                errmsg = "malformed set message";
+                goto reply_nak;
+            }
+
+            if (tblid >= proxy->ntable) {
+                error  = ENOENT;
+                errmsg = "invalid table id";
+                goto reply_nak;
+            }
+
+            d->id      = tblid;
+            d->columns = v;
+            d->coldefs = proxy->tables[d->id].columns;
+            d->ncolumn = proxy->tables[d->id].ncolumn;
+            d->nrow    = nrow;
+
+            if (nvalue < d->ncolumn * d->nrow) {
+                error  = EINVAL;
+                errmsg = "invalid set message";
+                goto reply_nak;
+            }
+
+            if (!decode_set_message(req, &it, d)) {
+                error  = EINVAL;
+                errmsg = "invalid set message";
+                goto reply_nak;
+            }
+
+            v += d->ncolumn * d->nrow;
+            d++;
+        }
+
+        if (set_proxy_tables(proxy, data, ntable, &error, &errmsg)) {
+            send_ack_reply(proxy->t, seq);
+
+            return;
+        }
+    }
+
+ reply_nak:
+    send_nak_reply(proxy->t, seq, error, errmsg);
+#else
+    uint16_t    utable, uvalue;
+    int         ntable, nvalue;
+    int         error;
+    const char *errmsg;
+
+    if (mrp_msg_get(req,
+                    MRP_PEPMSG_UINT16(NTABLE, &utable),
+                    MRP_PEPMSG_UINT16(NTOTAL, &uvalue),
+                    MRP_MSG_END)) {
+        mrp_pep_data_t  tables[utable];
+        mrp_pep_value_t values[uvalue];
+
+        ntable = utable;
+        nvalue = uvalue;
+
+        if (decode_set_message(req, tables, ntable, values, nvalue)) {
+            if (set_proxy_tables(proxy, tables, ntable, &error, &errmsg)) {
+                send_ack_reply(proxy->t, seq);
+
+                return;
+            }
+        }
+        else
+            goto malformed;
+    }
+    else {
+    malformed:
+        error  = EINVAL;
+        errmsg = "malformed set message";
+    }
+
+    send_nak_reply(proxy->t, seq, error, errmsg);
+#endif
+}
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+    pep_proxy_t *proxy = (pep_proxy_t *)user_data;
+    char        *name  = proxy && proxy->name ? proxy->name : "<unknown>";
+    uint16_t     type;
+    uint32_t     seq;
+
+    /*
+      mrp_log_info("Message from client %p:", proxy);
+      mrp_msg_dump(msg, stdout);
+    */
+
+    if (!mrp_msg_get(msg,
+                     MRP_PEPMSG_UINT16(MSGTYPE, &type),
+                     MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
+                     MRP_MSG_END)) {
+        mrp_log_error("Malformed message from client %s.", name);
+        send_nak_reply(t, 0, EINVAL, "malformed message");
+    }
+    else {
+        switch (type) {
+        case MRP_PEPMSG_REGISTER:
+            if (!process_register_request(proxy, msg, seq))
+                destroy_proxy(proxy);
+            break;
+
+        case MRP_PEPMSG_UNREGISTER:
+            process_unregister_request(proxy, seq);
+            break;
+
+        case MRP_PEPMSG_SET:
+            process_set_request(proxy, msg, seq);
+            break;
+
+        default:
+            break;
+        }
+    }
+}
+
+
+static void connect_cb(mrp_transport_t *ext, void *user_data)
+{
+    pdp_t       *pdp = (pdp_t *)user_data;
+    pep_proxy_t *proxy;
+    int          flags;
+
+    proxy = create_proxy(pdp);
+
+    if (proxy != NULL) {
+        flags    = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
+        proxy->t = mrp_transport_accept(ext, proxy, flags);
+
+        if (proxy->t != NULL)
+            mrp_log_info("Accepted new client connection.");
+        else {
+            mrp_log_error("Failed to accept new client connection.");
+            destroy_proxy(proxy);
+        }
+    }
+}
+
+
+static void closed_cb(mrp_transport_t *t, int error, void *user_data)
+{
+    pep_proxy_t *proxy = (pep_proxy_t *)user_data;
+    char        *name  = proxy && proxy->name ? proxy->name : "<unknown>";
+
+    MRP_UNUSED(t);
+
+    if (error)
+        mrp_log_error("Transport to client %s closed (%d: %s).",
+                      name, error, strerror(error));
+    else
+        mrp_log_info("Transport to client %s closed.", name);
+
+    mrp_log_info("Destroying client %s.", name);
+    destroy_proxy(proxy);
+}
+
+
+static int create_ext_transport(pdp_t *pdp)
+{
+    static mrp_transport_evt_t evt = {
+        .closed      = closed_cb,
+        .recvmsg     = recv_cb,
+        .recvmsgfrom = NULL,
+        .connection  = connect_cb,
+    };
+
+    mrp_transport_t *t;
+    mrp_sockaddr_t   addr;
+    socklen_t        addrlen;
+    int              flags;
+    const char      *type;
+
+    t       = NULL;
+    addrlen = mrp_transport_resolve(NULL, pdp->address,
+                                    &addr, sizeof(addr), &type);
+
+    if (addrlen > 0) {
+        flags = MRP_TRANSPORT_REUSEADDR;
+        t     = mrp_transport_create(pdp->ctx->ml, type, &evt, pdp, flags);
+
+        if (t != NULL) {
+            if (mrp_transport_bind(t, &addr, addrlen) &&
+                mrp_transport_listen(t, 4)) {
+                mrp_log_info("Listening on transport %s...", pdp->address);
+                pdp->ext = t;
+
+                return TRUE;
+            }
+            else
+                mrp_log_error("Failed to bind transport to %s.", pdp->address);
+        }
+        else
+            mrp_log_error("Failed to create transport for %s.", pdp->address);
+    }
+    else
+        mrp_log_error("Invalid transport address %s.", pdp->address);
+
+    return FALSE;
+}
+
+
+static void destroy_ext_transport(pdp_t *pdp)
+{
+    if (pdp != NULL) {
+        mrp_transport_destroy(pdp->ext);
+        pdp->ext = NULL;
+    }
+}
+
+
+static int create_transports(pdp_t *pdp)
+{
+    return create_ext_transport(pdp);
+}
+
+
+static void destroy_transports(pdp_t *pdp)
+{
+    destroy_ext_transport(pdp);
+}
diff --git a/src/plugins/decision-proto/decision.h b/src/plugins/decision-proto/decision.h
new file mode 100644 (file)
index 0000000..896fc22
--- /dev/null
@@ -0,0 +1,11 @@
+#ifndef __MURPHY_DECISION_H__
+#define __MURPHY_DECISION_H__
+
+#include "decision-types.h"
+
+pdp_t *create_decision(mrp_context_t *ctx, const char *address);
+void destroy_decision(pdp_t *pdp);
+
+void schedule_notification(pdp_t *pdp);
+
+#endif /* __MURPHY_DECISION_H__ */
diff --git a/src/plugins/decision-proto/message.c b/src/plugins/decision-proto/message.c
new file mode 100644 (file)
index 0000000..b416144
--- /dev/null
@@ -0,0 +1,390 @@
+#include "message.h"
+
+static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col,
+                          int ncolumn, mrp_pep_value_t *data);
+
+mrp_msg_t *create_register_message(mrp_pep_t *pep)
+{
+    mrp_msg_t        *msg;
+    mrp_pep_table_t  *t;
+    mqi_column_def_t *c;
+    uint16_t          ncolumn, type;
+    int               i, j;
+
+    ncolumn = 0;
+    for (i = 0; i < pep->nowned; i++)
+        ncolumn += pep->owned[i].ncolumn;
+    for (i = 0; i < pep->nwatched; i++)
+        ncolumn += pep->watched[i].ncolumn;
+
+    msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_REGISTER),
+                         MRP_PEPMSG_UINT32(MSGSEQ , 0),
+                         MRP_PEPMSG_STRING(NAME   , pep->name),
+                         MRP_PEPMSG_UINT16(NTABLE , pep->nowned),
+                         MRP_PEPMSG_UINT16(NWATCH , pep->nwatched),
+                         MRP_PEPMSG_UINT16(NCOLDEF, ncolumn),
+                         MRP_MSG_END);
+
+    for (i = 0, t = pep->owned; i < pep->nowned; i++, t++) {
+        mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, t->name));
+        mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOLUMN, t->ncolumn));
+        mrp_msg_append(msg, MRP_PEPMSG_SINT16(TBLIDX , t->idx_col));
+        for (j = 0, c = t->columns; j < t->ncolumn; j++, c++) {
+            if (c->type == mqi_varchar)
+                type = mqi_blob + c->length;
+            else
+                type = c->type;
+
+            mrp_msg_append(msg, MRP_PEPMSG_STRING(COLNAME, c->name));
+            mrp_msg_append(msg, MRP_PEPMSG_UINT16(COLTYPE, type));
+        }
+    }
+
+    for (i = 0, t = pep->watched; i < pep->nwatched; i++, t++) {
+        mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, t->name));
+        mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOLUMN, t->ncolumn));
+        for (j = 0, c = t->columns; j < t->ncolumn; j++, c++) {
+            if (c->type == mqi_varchar)
+                type = mqi_blob + c->length;
+            else
+                type = c->type;
+
+            mrp_msg_append(msg, MRP_PEPMSG_STRING(COLNAME, c->name));
+            mrp_msg_append(msg, MRP_PEPMSG_UINT16(COLTYPE, type));
+        }
+    }
+
+
+    return msg;
+}
+
+
+int decode_register_message(mrp_msg_t *msg, mrp_pep_table_t *owned, int nowned,
+                            mrp_pep_table_t *watched, int nwatched,
+                            mqi_column_def_t *columns, int ncolumn)
+{
+    mrp_pep_table_t  *t;
+    mqi_column_def_t *c;
+    void             *it;
+    char             *name;
+    uint16_t          ntbl, nwch, ncol, type, idx_col;
+    int               i, j, n;
+
+    it = NULL;
+
+    if (!mrp_msg_iterate_get(msg, &it,
+                             MRP_PEPMSG_UINT16(NTABLE , &ntbl),
+                             MRP_PEPMSG_UINT16(NWATCH , &nwch),
+                             MRP_PEPMSG_UINT16(NCOLDEF, &ncol),
+                             MRP_MSG_END))
+        return FALSE;
+
+    if (ntbl > nowned || nwch > nwatched || ncol > ncolumn)
+        return FALSE;
+
+    n = 0;
+    c = columns;
+    for (i = 0, t = owned; i < nowned; i++, t++) {
+        if (mrp_msg_iterate_get(msg, &it,
+                                MRP_PEPMSG_STRING(TBLNAME, &name),
+                                MRP_PEPMSG_UINT16(NCOLUMN, &ncol),
+                                MRP_PEPMSG_SINT16(TBLIDX , &idx_col),
+                                MRP_MSG_END)) {
+            t->name    = name;
+            t->columns = c;
+            t->ncolumn = ncol;
+            t->idx_col = idx_col;
+        }
+        else
+            return FALSE;
+
+        for (j = 0; j < t->ncolumn; j++, c++, n++) {
+            if (n >= ncolumn)
+                return FALSE;
+
+            if (mrp_msg_iterate_get(msg, &it,
+                                    MRP_PEPMSG_STRING(COLNAME, &name),
+                                    MRP_PEPMSG_UINT16(COLTYPE, &type),
+                                    MRP_MSG_END)) {
+                c->name = name;
+
+                if (type > mqi_blob) {
+                    c->type   = mqi_varchar;
+                    c->length = type - mqi_blob;
+                }
+                else
+                    c->type = type;
+            }
+        }
+    }
+
+    for (i = 0, t = watched; i < nwatched; i++, t++) {
+        if (mrp_msg_iterate_get(msg, &it,
+                                MRP_PEPMSG_STRING(TBLNAME, &name),
+                                MRP_PEPMSG_UINT16(NCOLUMN, &ncol),
+                                MRP_MSG_END)) {
+            t->name    = name;
+            t->columns = c;
+            t->ncolumn = ncol;
+            t->idx_col = -1;
+        }
+        else
+            return FALSE;
+
+        for (j = 0; j < t->ncolumn; j++, c++, n++) {
+            if (n >= ncolumn)
+                return FALSE;
+            if (mrp_msg_iterate_get(msg, &it,
+                                    MRP_PEPMSG_STRING(COLNAME, &name),
+                                    MRP_PEPMSG_UINT16(COLTYPE, &type),
+                                    MRP_MSG_END)) {
+                c->name = name;
+
+                if (type > mqi_blob) {
+                    c->type   = mqi_varchar;
+                    c->length = type - mqi_blob;
+                }
+                else {
+                    c->type   = type;
+                    c->length = 0;
+                }
+
+                c->flags = 0;
+            }
+        }
+    }
+
+    return TRUE;
+}
+
+
+mrp_msg_t *create_ack_message(uint32_t seq)
+{
+    return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_ACK),
+                          MRP_PEPMSG_UINT32(MSGSEQ , seq),
+                          MRP_MSG_END);
+}
+
+
+mrp_msg_t *create_nak_message(uint32_t seq, int error, const char *errmsg)
+{
+    return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NAK),
+                          MRP_PEPMSG_UINT32(MSGSEQ , seq),
+                          MRP_PEPMSG_SINT32(ERRCODE, error),
+                          MRP_PEPMSG_STRING(ERRMSG , errmsg),
+                          MRP_MSG_END);
+}
+
+
+mrp_msg_t *create_notify_message(void)
+{
+    return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NOTIFY),
+                          MRP_PEPMSG_UINT32(MSGSEQ , 0),
+                          MRP_PEPMSG_UINT16(NCHANGE, 0),
+                          MRP_PEPMSG_UINT16(NTOTAL , 0),
+                          MRP_MSG_END);
+}
+
+
+int update_notify_message(mrp_msg_t *msg, int id, mqi_column_def_t *columns,
+                          int ncolumn, mrp_pep_value_t *data, int nrow)
+{
+    mrp_pep_value_t *v;
+    uint16_t         tid, nr;
+    int              i;
+
+    nr  = nrow;
+    tid = id;
+
+    if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) ||
+        !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nr )))
+        return FALSE;
+
+    for (i = 0, v = data; i < nrow; i++, v += ncolumn) {
+        if (!append_one_row(msg, MRP_PEPTAG_DATA, columns, ncolumn, v))
+            return FALSE;
+    }
+
+    return TRUE;
+}
+
+
+int decode_notify_message(mrp_msg_t *msg, void **it, mrp_pep_data_t *data)
+{
+    int               i, j;
+    mrp_pep_value_t  *v;
+    mqi_column_def_t *d;
+
+    v = data->columns;
+    d = data->coldefs;
+
+    for (i = 0; i < data->nrow; i++) {
+        for (j = 0; j < data->ncolumn; j++) {
+            switch (d[j].type) {
+            case mqi_varchar:
+                if (!mrp_msg_iterate_get(msg, it,
+                                         MRP_PEPMSG_STRING(DATA, &v->str),
+                                         MRP_MSG_END))
+                    return FALSE;
+                break;
+
+            case mqi_integer:
+                if (!mrp_msg_iterate_get(msg, it,
+                                         MRP_PEPMSG_SINT32(DATA, &v->s32),
+                                         MRP_MSG_END))
+                    return FALSE;
+                break;
+
+            case mqi_unsignd:
+                if (!mrp_msg_iterate_get(msg, it,
+                                         MRP_PEPMSG_UINT32(DATA, &v->u32),
+                                         MRP_MSG_END))
+                    return FALSE;
+                break;
+
+            case mqi_floating:
+                if (!mrp_msg_iterate_get(msg, it,
+                                         MRP_PEPMSG_DOUBLE(DATA, &v->dbl),
+                                         MRP_MSG_END))
+                    return FALSE;
+                break;
+
+            default:
+                return FALSE;
+            }
+
+            v++;
+        }
+    }
+
+    return TRUE;
+}
+
+
+mrp_msg_t *create_set_message(uint32_t seq, mrp_pep_data_t *data, int ndata)
+{
+    mrp_msg_t        *msg;
+    mrp_pep_value_t  *vals;
+    mqi_column_def_t *defs;
+    uint16_t          utable, utotal, tid, nval, nrow;
+    int               i, j;
+
+    utable = ndata;
+    utotal = 0;
+
+    msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_SET),
+                         MRP_PEPMSG_UINT32(MSGSEQ , seq),
+                         MRP_PEPMSG_UINT16(NCHANGE, utable),
+                         MRP_PEPMSG_UINT16(NTOTAL , 0),
+                         MRP_MSG_END);
+
+    if (msg != NULL) {
+        for (i = 0; i < ndata; i++) {
+            tid  = data[i].id;
+            vals = data[i].columns;
+            defs = data[i].coldefs;
+            nval = data[i].ncolumn;
+            nrow = data[i].nrow;
+
+            if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) ||
+                !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nrow)))
+                goto fail;
+
+            for (j = 0; j < nrow; j++) {
+                if (!append_one_row(msg, MRP_PEPTAG_DATA, defs, nval, vals))
+                    goto fail;
+                vals   += nval;
+                utotal += nval;
+            }
+        }
+
+        mrp_msg_set(msg, MRP_PEPMSG_UINT16(NTOTAL, utotal));
+
+        return msg;
+    }
+
+ fail:
+    mrp_msg_unref(msg);
+    return NULL;
+}
+
+
+int decode_set_message(mrp_msg_t *msg, void **it, mrp_pep_data_t *data)
+{
+    int               i, j;
+    mrp_pep_value_t  *v;
+    mqi_column_def_t *d;
+
+    v = data->columns;
+    d = data->coldefs;
+
+    for (i = 0; i < data->nrow; i++) {
+        for (j = 0; j < data->ncolumn; j++) {
+            switch (d[j].type) {
+            case mqi_varchar:
+                if (!mrp_msg_iterate_get(msg, it,
+                                         MRP_PEPMSG_STRING(DATA, &v->str),
+                                         MRP_MSG_END))
+                    return FALSE;
+                break;
+
+            case mqi_integer:
+                if (!mrp_msg_iterate_get(msg, it,
+                                         MRP_PEPMSG_SINT32(DATA, &v->s32),
+                                         MRP_MSG_END))
+                    return FALSE;
+                break;
+
+            case mqi_unsignd:
+                if (!mrp_msg_iterate_get(msg, it,
+                                         MRP_PEPMSG_UINT32(DATA, &v->u32),
+                                         MRP_MSG_END))
+                    return FALSE;
+                break;
+
+            case mqi_floating:
+                if (!mrp_msg_iterate_get(msg, it,
+                                         MRP_PEPMSG_DOUBLE(DATA, &v->dbl),
+                                         MRP_MSG_END))
+                    return FALSE;
+                break;
+
+            default:
+                return FALSE;
+            }
+
+            v++;
+        }
+    }
+
+    return TRUE;
+}
+
+
+static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col,
+                          int ncolumn, mrp_pep_value_t *data)
+{
+#define HANDLE_TYPE(dbtype, type, member)                                 \
+    case mqi_##dbtype:                                                    \
+        if (!mrp_msg_append(msg, MRP_MSG_TAG_##type(tag, data->member)))  \
+            return FALSE;                                                 \
+        break
+
+    int i;
+
+    for (i = 0; i < ncolumn; i++, data++, col++) {
+        switch (col->type) {
+            HANDLE_TYPE(integer , SINT32, s32);
+            HANDLE_TYPE(unsignd , UINT32, u32);
+            HANDLE_TYPE(floating, DOUBLE, dbl);
+            HANDLE_TYPE(string  , STRING, str);
+        case mqi_blob:
+        default:
+            return FALSE;
+        }
+    }
+
+    return TRUE;
+
+#undef HANDLE_TYPE
+}
diff --git a/src/plugins/decision-proto/message.h b/src/plugins/decision-proto/message.h
new file mode 100644 (file)
index 0000000..42d8350
--- /dev/null
@@ -0,0 +1,99 @@
+#ifndef __MURPHY_DECISION_MESSAGE_H__
+#define __MURPHY_DECISION_MESSAGE_H__
+
+#include <murphy/common/msg.h>
+
+#include "decision-types.h"
+#include "client.h"
+
+
+#define MRP_PEPMSG_UINT16(tag, value) \
+    MRP_MSG_TAG_UINT16(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_SINT16(tag, value) \
+    MRP_MSG_TAG_SINT16(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_UINT32(tag, value) \
+    MRP_MSG_TAG_UINT32(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_SINT32(tag, value) \
+    MRP_MSG_TAG_SINT32(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_DOUBLE(tag, value) \
+    MRP_MSG_TAG_DOUBLE(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_STRING(tag, value) \
+    MRP_MSG_TAG_STRING(MRP_PEPTAG_##tag, value)
+
+/*
+ * message types
+ */
+
+typedef enum {
+    MRP_PEPMSG_REGISTER   = 0x1,         /* client: register me */
+    MRP_PEPMSG_UNREGISTER = 0x2,         /* client: unregister me */
+    MRP_PEPMSG_SET        = 0x3,         /* client: set table data */
+    MRP_PEPMSG_NOTIFY     = 0x4,         /* server: table changes */
+    MRP_PEPMSG_ACK        = 0x5,         /* server: ok */
+    MRP_PEPMSG_NAK        = 0x6,         /* server: request failed */
+} mrp_pepmsg_type_t;
+
+
+/*
+ * message-specific tags
+ */
+
+typedef enum {
+    /*
+     * fixed common tags
+     */
+    MRP_PEPTAG_MSGTYPE = 0x1,            /* message type */
+    MRP_PEPTAG_MSGSEQ  = 0x2,            /* sequence number */
+
+    /*
+     * fixed tags in registration messages
+     */
+    MRP_PEPTAG_NAME     = 0x3,           /* enforcement point name */
+    MRP_PEPTAG_NTABLE   = 0x4,           /* number of owned tables */
+    MRP_PEPTAG_NWATCH   = 0x5,           /* number of watched tables */
+    MRP_PEPTAG_NCOLDEF  = 0x6,           /* number of column definitions */
+    MRP_PEPTAG_TBLNAME  = 0x7,           /* table name */
+    MRP_PEPTAG_NCOLUMN  = 0x8,           /* number of columns */
+    MRP_PEPTAG_TBLIDX   = 0x9,           /* column index of index column */
+    MRP_PEPTAG_COLNAME  = 0xa,           /* column name */
+    MRP_PEPTAG_COLTYPE  = 0xb,           /* column type */
+
+    /*
+     * fixed tags in NAKs
+     */
+    MRP_PEPTAG_ERRCODE  = 0x3,           /* error code */
+    MRP_PEPTAG_ERRMSG   = 0x4,           /* error message */
+
+    /*
+     * fixed tags in data notification messages
+     */
+    MRP_PEPTAG_NCHANGE = 0x3,            /* number of tables in notification */
+    MRP_PEPTAG_NTOTAL  = 0x4,            /* total columns in notification */
+    MRP_PEPTAG_TBLID   = 0x5,            /* table id */
+    MRP_PEPTAG_NROW    = 0x6,            /* number of table rows */
+    MRP_PEPTAG_DATA    = 0x7,            /* a data column */
+} mrp_pepmsg_tag_t;
+
+
+mrp_msg_t *create_register_message(mrp_pep_t *pep);
+int decode_register_message(mrp_msg_t *msg, mrp_pep_table_t *tables, int ntable,
+                            mrp_pep_table_t *watches, int nwatch,
+                            mqi_column_def_t *columns, int ncolumn);
+
+mrp_msg_t *create_ack_message(uint32_t seq);
+mrp_msg_t *create_nak_message(uint32_t seq, int error, const char *errmsg);
+mrp_msg_t *create_notify_message(void);
+int update_notify_message(mrp_msg_t *msg, int id, mqi_column_def_t *columns,
+                          int ncolumn, mrp_pep_value_t *data, int nrow);
+int decode_notify_message(mrp_msg_t *msg, void **it, mrp_pep_data_t *t);
+
+mrp_msg_t *create_set_message(uint32_t seq, mrp_pep_data_t *tables, int ntable);
+int decode_set_message(mrp_msg_t *msg, void **it, mrp_pep_data_t *data);
+
+
+#endif /* __MURPHY_DECISION_MESSAGE_H__ */
diff --git a/src/plugins/decision-proto/notify.c b/src/plugins/decision-proto/notify.c
new file mode 100644 (file)
index 0000000..ddfb83b
--- /dev/null
@@ -0,0 +1,175 @@
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+
+#include "decision-types.h"
+#include "message.h"
+#include "notify.h"
+
+
+static void prepare_proxy_notification(pep_proxy_t *proxy)
+{
+    proxy->notify_ntable  = 0;
+    proxy->notify_ncolumn = 0;
+    proxy->notify_fail    = FALSE;
+}
+
+
+static int prepare_table_notification(pep_table_t *t)
+{
+    mrp_pep_value_t *data;
+    int              nrow, size, n;
+
+    nrow = mqi_get_table_size(t->h);
+
+    mrp_debug("size of table %s: %d rows", t->name, nrow);
+
+    if (nrow <= 0) {
+        t->notify_fail = FALSE;
+
+        return TRUE;
+    }
+
+    t->notify_stamp = mqi_get_table_stamp(t->h);
+
+    size = t->ncolumn * sizeof(*data);
+    data = mrp_allocz(nrow * size);
+
+    if (data != NULL) {
+        n = mqi_select(t->h, NULL, t->coldesc, data, size, nrow);
+
+        mrp_debug("select from %s: %d rows", t->name, n);
+
+        if (n <= nrow) {
+            t->notify_data = data;
+            t->notify_nrow = n;
+            t->notify_fail = FALSE;
+
+            return TRUE;
+        }
+
+        mrp_free(data);
+    }
+
+    t->notify_fail = TRUE;
+
+    return FALSE;
+}
+
+
+static void free_table_notification(pep_table_t *t)
+{
+    mrp_free(t->notify_data);
+
+    t->notify_data = NULL;
+    t->notify_nrow = 0;
+    t->notify_all  = FALSE;
+}
+
+
+static int collect_watch_notification(pep_watch_t *w)
+{
+    pep_proxy_t *proxy = w->proxy;
+    pep_table_t *t     = w->table;
+
+    if (!proxy->notify_fail && !t->notify_fail) {
+        mrp_debug("updating %s watch for %s", t->name, proxy->name);
+
+        if (proxy->notify_all || t->notify_all || t->notify_stamp != w->stamp) {
+            if (proxy->notify_msg == NULL)
+                proxy->notify_msg = create_notify_message();
+
+            if (proxy->notify_msg != NULL) {
+                if (update_notify_message(proxy->notify_msg, w->id,
+                                          t->columns, t->ncolumn,
+                                          t->notify_data, t->notify_nrow)) {
+                    proxy->notify_ntable++;
+                    proxy->notify_ncolumn += (t->notify_nrow * t->ncolumn);
+                success:
+                    w->stamp = t->notify_stamp;
+
+                    return TRUE;
+                }
+            }
+        }
+        else
+            goto success;
+    }
+
+    proxy->notify_fail = TRUE;
+
+    return FALSE;
+}
+
+
+static int send_proxy_notification(pep_proxy_t *proxy)
+{
+    uint16_t nchange, ntotal;
+
+    if (proxy->notify_msg == NULL)
+        return TRUE;
+
+    if (!proxy->notify_fail) {
+        mrp_debug("notifying client %s", proxy->name);
+
+        nchange = proxy->notify_ntable;
+        ntotal  = proxy->notify_ncolumn;
+
+        mrp_msg_set(proxy->notify_msg, MRP_PEPMSG_UINT16(NCHANGE, nchange));
+        mrp_msg_set(proxy->notify_msg, MRP_PEPMSG_UINT16(NTOTAL , ntotal ));
+
+
+        /*
+          mrp_log_info("Notification message for client %s:", proxy->name);
+          mrp_msg_dump(proxy->notify_msg, stdout);
+        */
+
+
+        mrp_transport_send(proxy->t, proxy->notify_msg);
+    }
+    else
+        mrp_log_error("Failed to generate/send notification to %s.",
+                      proxy->name);
+
+    mrp_msg_unref(proxy->notify_msg);
+
+    proxy->notify_msg     = NULL;
+    proxy->notify_ntable  = 0;
+    proxy->notify_ncolumn = 0;
+    proxy->notify_fail    = FALSE;
+    proxy->notify_all     = FALSE;
+
+    return TRUE;
+}
+
+
+void notify_table_changes(pdp_t *pdp)
+{
+    mrp_list_hook_t *p, *n, *wp, *wn;
+    pep_proxy_t     *proxy;
+    pep_table_t     *t;
+    pep_watch_t     *w;
+
+    mrp_debug("notifying clients about table changes");
+
+    mrp_list_foreach(&pdp->proxies, p, n) {
+        proxy = mrp_list_entry(p, typeof(*proxy), hook);
+        prepare_proxy_notification(proxy);
+    }
+
+    mrp_list_foreach(&pdp->tables, p, n) {
+        t = mrp_list_entry(p, typeof(*t), hook);
+        prepare_table_notification(t);
+
+        mrp_list_foreach(&t->watches, wp, wn) {
+            w = mrp_list_entry(wp, typeof(*w), tbl_hook);
+            collect_watch_notification(w);
+        }
+
+        free_table_notification(t);
+    }
+
+    mrp_list_foreach(&pdp->proxies, p, n) {
+        proxy = mrp_list_entry(p, typeof(*proxy), hook);
+        send_proxy_notification(proxy);
+    }
+}
diff --git a/src/plugins/decision-proto/notify.h b/src/plugins/decision-proto/notify.h
new file mode 100644 (file)
index 0000000..60a1e84
--- /dev/null
@@ -0,0 +1,8 @@
+#ifndef __MURPHY_DECISION_NOTIFY_H__
+#define __MURPHY_DECISION_NOTIFY_H__
+
+#include "decision-types.h"
+
+void notify_table_changes(pdp_t *pdp);
+
+#endif /* __MURPHY_DECISION_NOTIFY_H__ */
diff --git a/src/plugins/decision-proto/plugin-decision.c b/src/plugins/decision-proto/plugin-decision.c
new file mode 100644 (file)
index 0000000..c9f2390
--- /dev/null
@@ -0,0 +1,54 @@
+#include <murphy/common/macros.h>
+
+#include <murphy/core/plugin.h>
+#include <murphy/core/console.h>
+
+#include "decision-types.h"
+#include "decision.h"
+#include "client.h"
+
+
+static int plugin_init(mrp_plugin_t *plugin)
+{
+    plugin->data = create_decision(plugin->ctx, MRP_DEFAULT_PEP_ADDRESS);
+
+    return (plugin->data != NULL);
+}
+
+
+static void plugin_exit(mrp_plugin_t *plugin)
+{
+    pdp_t *pdp = (pdp_t *)plugin->data;
+
+    destroy_decision(pdp);
+}
+
+
+static void cmd_cb(mrp_console_t *c, void *user_data, int argc, char **argv)
+{
+    MRP_UNUSED(user_data);
+    MRP_UNUSED(argc);
+    MRP_UNUSED(argv);
+
+    mrp_console_printf(c, "decision:%s() called...\n", __FUNCTION__);
+}
+
+
+#define PLUGIN_DESCRIPTION "Murphy decision making plugin prototype."
+#define PLUGIN_VERSION     MRP_VERSION_INT(0, 0, 1)
+#define PLUGIN_HELP        "TODO..."
+#define PLUGIN_AUTHORS     "Aku Ankka <aku.ankka@ankkalinna.org>"
+
+MRP_CONSOLE_GROUP(plugin_commands, "decision", NULL, NULL, {
+        MRP_TOKENIZED_CMD("cmd", cmd_cb, TRUE,
+                          "cmd [args]", "a command", "A command..."),
+});
+
+MURPHY_REGISTER_PLUGIN("decision-proto",
+                       PLUGIN_VERSION, PLUGIN_DESCRIPTION,
+                       PLUGIN_AUTHORS, PLUGIN_HELP, MRP_SINGLETON,
+                       plugin_init, plugin_exit,
+                       NULL, 0, /* plugin argument table */
+                       NULL, 0, /* exported methods */
+                       NULL, 0, /* imported methods */
+                       &plugin_commands);
diff --git a/src/plugins/decision-proto/proxy.c b/src/plugins/decision-proto/proxy.c
new file mode 100644 (file)
index 0000000..37fd3a9
--- /dev/null
@@ -0,0 +1,105 @@
+#include <murphy/common/log.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/list.h>
+
+#include "decision-types.h"
+#include "table.h"
+#include "proxy.h"
+
+
+int init_proxies(pdp_t *pdp)
+{
+    mrp_list_init(&pdp->proxies);
+
+    return TRUE;
+}
+
+
+void destroy_proxies(pdp_t *pdp)
+{
+    MRP_UNUSED(pdp);
+
+    return;
+}
+
+
+pep_proxy_t *create_proxy(pdp_t *pdp)
+{
+    pep_proxy_t *proxy;
+
+    proxy = mrp_allocz(sizeof(*proxy));
+
+    if (proxy != NULL) {
+        mrp_list_init(&proxy->hook);
+        mrp_list_init(&proxy->watches);
+
+        proxy->pdp = pdp;
+
+        mrp_list_append(&pdp->proxies, &proxy->hook);
+    }
+
+    return proxy;
+}
+
+
+void destroy_proxy(pep_proxy_t *proxy)
+{
+    int i;
+
+    if (proxy != NULL) {
+        mrp_list_delete(&proxy->hook);
+
+        for (i = 0; i < proxy->ntable; i++)
+            destroy_proxy_table(proxy->tables + i);
+
+        destroy_proxy_watches(proxy);
+
+        mrp_free(proxy);
+    }
+}
+
+
+int register_proxy(pep_proxy_t *proxy, char *name,
+                   mrp_pep_table_t *tables, int ntable,
+                   mrp_pep_table_t *watches, int nwatch,
+                   int *error, const char **errmsg)
+{
+    int i;
+
+    proxy->name   = mrp_strdup(name);
+    proxy->tables = mrp_allocz_array(typeof(*proxy->tables) , ntable);
+    proxy->ntable = ntable;
+
+    if (proxy->name == NULL || (ntable && proxy->tables == NULL))
+        return FALSE;
+
+    for (i = 0; i < ntable; i++) {
+        if (create_proxy_table(proxy->tables + i, tables + i, error, errmsg))
+            mrp_log_info("Client %s created table %s.", proxy->name,
+                         tables[i].name);
+        else {
+            mrp_log_error("Client %s failed to create table %s (%d: %s).",
+                          proxy->name, tables[i].name, *error, *errmsg);
+            return FALSE;
+        }
+    }
+
+    for (i = 0; i < nwatch; i++) {
+        if (create_proxy_watch(proxy, i, watches + i, error, errmsg))
+            mrp_log_info("Client %s subscribed for table %s.", proxy->name,
+                         watches[i].name);
+        else
+            mrp_log_error("Client %s failed to subscribe for table %s.",
+                          proxy->name, watches[i].name);
+    }
+
+    return TRUE;
+}
+
+
+int unregister_proxy(pep_proxy_t *proxy)
+{
+    destroy_proxy(proxy);
+
+    return TRUE;
+}
diff --git a/src/plugins/decision-proto/proxy.h b/src/plugins/decision-proto/proxy.h
new file mode 100644 (file)
index 0000000..7e4788a
--- /dev/null
@@ -0,0 +1,18 @@
+#ifndef __MURPHY_DECISION_PROXY_H__
+#define __MURPHY_DECISION_PROXY_H__
+
+#include "decision-types.h"
+
+int init_proxies(pdp_t *pdp);
+void destroy_proxies(pdp_t *pdp);
+
+pep_proxy_t *create_proxy(pdp_t *pdp);
+void destroy_proxy(pep_proxy_t *proxy);
+
+int register_proxy(pep_proxy_t *proxy, char *name,
+                   mrp_pep_table_t *tables, int ntable,
+                   mrp_pep_table_t *watches, int nwatch,
+                   int *error, const char **errmsg);
+int unregister_proxy(pep_proxy_t *proxy);
+
+#endif /* __MURPHY_DECISION_PROXY_H__ */
diff --git a/src/plugins/decision-proto/table-common.c b/src/plugins/decision-proto/table-common.c
new file mode 100644 (file)
index 0000000..b35fb2b
--- /dev/null
@@ -0,0 +1,251 @@
+#include <errno.h>
+
+#include <murphy/common/debug.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/common/utils.h>
+
+#include <murphy-db/mqi.h>
+
+#include "table.h"
+
+
+/*
+ * client-side tables, common table routines
+ */
+
+static void purge_pep_table(mrp_pep_table_t *t)
+{
+    int i;
+
+    if (t != NULL) {
+        mrp_free((char *)t->name);
+
+        for (i = 0; i < t->ncolumn; i++)
+            mrp_free((char *)t->columns[i].name);
+
+        mrp_free(t->columns);
+    }
+}
+
+
+static void free_column_definitions(mqi_column_def_t *columns, int ncolumn)
+{
+    int i;
+
+    if (columns != NULL) {
+        for (i = 0; i < ncolumn; i++)
+            mrp_free((char *)columns[i].name);
+
+        mrp_free(columns);
+    }
+}
+
+
+static int copy_column_definitions(mqi_column_def_t *src, int nsrc,
+                                   mqi_column_def_t **dstcol, int *ndst)
+{
+    mqi_column_def_t *dst;
+    int               n, i;
+
+    if (nsrc > 0) {
+        if (src[nsrc - 1].name == NULL)
+            n = nsrc - 1;
+        else
+            n = nsrc;
+
+        dst = mrp_allocz_array(mqi_column_def_t, n + 1);
+
+        if (dst == NULL)
+            return FALSE;
+
+        for (i = 0; i < n; i++) {
+            dst[i].type   = src[i].type;
+            dst[i].length = src[i].length;
+            dst[i].name   = mrp_strdup(src[i].name);
+
+            if (dst[i].name == NULL)
+                goto fail;
+        }
+
+        *dstcol = dst;
+        *ndst   = n;
+
+        return TRUE;
+    }
+    else {
+        *dstcol = NULL;
+        *ndst   = 0;
+
+        return FALSE;
+    }
+
+ fail:
+    free_column_definitions(dst, n);
+    return FALSE;
+}
+
+
+static void free_column_descriptors(mqi_column_desc_t *coldesc)
+{
+    mrp_free(coldesc);
+}
+
+
+static int setup_column_descriptors(mqi_column_def_t *columns, int ncolumn,
+                                    mqi_column_desc_t **coldesc)
+{
+#define SETUP_TYPE(type, member)                                        \
+                case mqi_##type:                                        \
+                    desc->cindex = i;                                   \
+                    desc->offset = (void *)&col->member - (void *)NULL; \
+                    break;
+
+    mqi_column_def_t  *def;
+    mqi_column_desc_t *desc;
+    mrp_pep_value_t   *col;
+    int                i;
+
+    *coldesc = mrp_allocz_array(mqi_column_desc_t, ncolumn + 1);
+
+    if (coldesc != NULL) {
+        def  = columns;
+        desc = *coldesc;
+        col  = NULL;
+
+        for (i = 0; i < ncolumn; i++) {
+            switch (def->type) {
+                SETUP_TYPE(integer , s32);
+                SETUP_TYPE(unsignd , u32);
+                SETUP_TYPE(floating, dbl);
+                SETUP_TYPE(string  , str);
+
+            default:
+            case mqi_blob:
+                goto fail;
+            }
+
+            def++;
+            desc++;
+            col++;
+        }
+
+        desc->cindex = -1;
+        desc->offset = 1;
+
+        return TRUE;
+    }
+
+ fail:
+    free_column_descriptors(*coldesc);
+    *coldesc = NULL;
+
+    return FALSE;
+
+#undef SETUP_TYPE
+}
+
+
+static int check_columns(mqi_column_def_t *p, int np,
+                         mqi_column_def_t *q, int nq)
+{
+    int i;
+
+    if  (np == nq) {
+        for (i = 0; i < np; i++, p++, q++) {
+            if (p->type != q->type || p->length != q->length)
+                return FALSE;
+            if (strcmp(p->name, q->name))
+                return FALSE;
+        }
+
+        return TRUE;
+    }
+    else
+        return FALSE;
+}
+
+
+int copy_pep_table(mrp_pep_table_t *src, mrp_pep_table_t *dst)
+{
+    int i, ncolumn;
+
+    dst->name = mrp_strdup(src->name);
+    if (dst->name == NULL)
+        return FALSE;
+
+    if (src->columns[src->ncolumn - 1].name != NULL)
+        ncolumn = src->ncolumn;
+    else
+        ncolumn = src->ncolumn - 1;
+
+    dst->columns = mrp_allocz_array(typeof(*dst->columns), ncolumn + 1);
+    if (dst->columns == NULL) {
+        mrp_free((char *)dst->name);
+
+        return FALSE;
+    }
+
+    dst->ncolumn = ncolumn;
+    dst->idx_col = -1;
+
+    for (i = 0; i < ncolumn; i++) {
+        dst->columns[i].type   = src->columns[i].type;
+        dst->columns[i].length = src->columns[i].length;
+        dst->columns[i].name   = mrp_strdup(src->columns[i].name);
+
+        if (dst->columns[i].name == NULL)
+            goto fail;
+
+        if (src->columns[i].flags != 0) {
+            if (dst->idx_col == -1)
+                dst->idx_col = i;
+            else
+                goto fail;
+        }
+
+        dst->columns[i].flags = 0;
+    }
+
+    return TRUE;
+
+ fail:
+    purge_pep_table(dst);
+
+    return FALSE;
+}
+
+
+int copy_pep_tables(mrp_pep_table_t *src, mrp_pep_table_t *dst, int n)
+{
+    int i;
+
+    for (i = 0; i < n; i++) {
+        if (!copy_pep_table(src + i, dst + i)) {
+            for (i--; i >= 0; i--)
+                purge_pep_table(dst + i);
+
+            return FALSE;
+        }
+    }
+
+    return TRUE;
+}
+
+
+void free_pep_table(mrp_pep_table_t *t)
+{
+    purge_pep_table(t);
+    mrp_free(t);
+}
+
+
+void free_pep_tables(mrp_pep_table_t *tables, int n)
+{
+    int i;
+
+    for (i = 0; i < n; i++)
+        purge_pep_table(tables + i);
+
+    mrp_free(tables);
+}
diff --git a/src/plugins/decision-proto/table.c b/src/plugins/decision-proto/table.c
new file mode 100644 (file)
index 0000000..80f32aa
--- /dev/null
@@ -0,0 +1,456 @@
+#include <errno.h>
+
+#include <murphy/common/debug.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/common/utils.h>
+
+#include <murphy-db/mqi.h>
+
+#include "decision.h"
+#include "table.h"
+
+#define FAIL(ec, msg) do {                      \
+        *errcode = ec;                          \
+        *errmsg = msg;                          \
+        goto fail;                              \
+    } while (0)
+
+static pep_table_t *lookup_watch_table(pdp_t *pdp, const char *name);
+
+#include "table-common.c"
+
+/*
+ * proxied and tracked tables
+ */
+
+
+static void table_event_cb(mqi_event_t *e, void *user_data)
+{
+    pdp_t        *pdp  = (pdp_t *)user_data;
+    const char   *name = e->table.table.name;
+    mqi_handle_t  h    = e->table.table.handle;
+    pep_table_t  *t;
+
+    switch (e->event) {
+    case mqi_table_created:
+        mrp_debug("table %s (0x%x) created", name, h);
+        break;
+
+    case mqi_table_dropped:
+        mrp_debug("table %s (0x%x) dropped", name, h);
+        break;
+    default:
+        return;
+    }
+
+    t = lookup_watch_table(pdp, name);
+
+    if (t != NULL) {
+        t->notify_all = TRUE;
+        t->h          = h;
+    }
+
+    schedule_notification(pdp);
+}
+
+
+static void transaction_event_cb(mqi_event_t *e, void *user_data)
+{
+    pdp_t *pdp = (pdp_t *)user_data;
+
+    switch (e->event) {
+    case mqi_transaction_end:
+        mrp_debug("transaction ended");
+        schedule_notification(pdp);
+        break;
+
+    case mqi_transaction_start:
+        mrp_debug("transaction started");
+        break;
+
+    default:
+        break;
+    }
+}
+
+
+static int open_db(pdp_t *pdp)
+{
+    if (mqi_open() == 0) {
+        if (mqi_create_transaction_trigger(transaction_event_cb, pdp) == 0) {
+            if (mqi_create_table_trigger(table_event_cb, pdp) == 0)
+                return TRUE;
+            else
+                mqi_drop_transaction_trigger(transaction_event_cb, pdp);
+        }
+    }
+
+    return FALSE;
+}
+
+
+static void close_db(pdp_t *pdp)
+{
+    mqi_drop_table_trigger(table_event_cb, pdp);
+    mqi_drop_transaction_trigger(transaction_event_cb, pdp);
+}
+
+
+static void purge_watch_table_cb(void *key, void *entry);
+
+
+
+int init_tables(pdp_t *pdp)
+{
+    mrp_htbl_config_t hcfg;
+
+    if (open_db(pdp)) {
+        mrp_list_init(&pdp->tables);
+
+        mrp_clear(&hcfg);
+        hcfg.comp = mrp_string_comp;
+        hcfg.hash = mrp_string_hash;
+        hcfg.free = purge_watch_table_cb;
+
+        pdp->watched = mrp_htbl_create(&hcfg);
+    }
+
+    return (pdp->watched != NULL);
+}
+
+
+void destroy_tables(pdp_t *pdp)
+{
+    close_db(pdp);
+    mrp_htbl_destroy(pdp->watched, TRUE);
+
+    pdp->watched = NULL;
+}
+
+
+int create_proxy_table(pep_table_t *t, mrp_pep_table_t *def,
+                       int *errcode, const char **errmsg)
+{
+    mqi_column_def_t  **cols;
+    mqi_column_desc_t **desc;
+    int                *ncol;
+    char               *index[2];
+
+    if (mqi_get_table_handle((char *)def->name) != MQI_HANDLE_INVALID)
+        FAIL(EEXIST, "table already exists");
+
+    if (def->idx_col >= def->ncolumn)
+        FAIL(EINVAL, "invalid index column specified");
+
+    mrp_list_init(&t->hook);
+    mrp_list_init(&t->watches);
+
+    t->name = mrp_strdup(def->name);
+
+    if (t->name != NULL) {
+        cols = &t->columns;
+        ncol = &t->ncolumn;
+        desc = &t->coldesc;
+
+        if (!copy_column_definitions(def->columns, def->ncolumn, cols, ncol))
+            FAIL(ENOMEM, "failed to create table columns");
+
+        if (!setup_column_descriptors(t->columns, t->ncolumn, desc))
+            FAIL(ENOMEM, "failed to create table descriptor");
+
+        t->h = mqi_create_table(t->name, MQI_TEMPORARY, NULL, t->columns);
+
+        if (t->h != MQI_HANDLE_INVALID) {
+            if (def->idx_col >= 0) {
+                index[0] = (char *)def->columns[def->idx_col].name;
+                index[1] = NULL;
+
+                if (mqi_create_index(t->h, index) != 0)
+                    FAIL(EINVAL, "failed to create table index");
+            }
+
+            mrp_debug("create table %s", t->name);
+
+            return TRUE;
+        }
+        else
+            FAIL(EINVAL, "failed to create table");
+    }
+    else
+        FAIL(ENOMEM, "failed to create table");
+
+ fail:
+    return FALSE;
+}
+
+
+void destroy_proxy_table(pep_table_t *t)
+{
+    mrp_debug("destroying table %s", t->name ? t->name : "<unknown>");
+
+    if (t->h != MQI_HANDLE_INVALID)
+        mqi_drop_table(t->h);
+
+    free_column_definitions(t->columns, t->ncolumn);
+    free_column_descriptors(t->coldesc);
+
+    mrp_free(t->name);
+
+    t->name    = NULL;
+    t->h       = MQI_HANDLE_INVALID;
+    t->columns = NULL;
+    t->ncolumn = 0;
+}
+
+
+void destroy_proxy_tables(pep_proxy_t *proxy)
+{
+    int i;
+
+    mrp_debug("destroying tables of client %s", proxy->name);
+
+    for (i = 0; i < proxy->ntable; i++)
+        destroy_proxy_table(proxy->tables + i);
+
+    proxy->tables = NULL;
+    proxy->ntable = 0;
+}
+
+
+pep_table_t *create_watch_table(pdp_t *pdp, const char *name,
+                                mqi_column_def_t *columns, int ncolumn)
+{
+    pep_table_t        *t;
+    mqi_column_def_t  **cols;
+    mqi_column_desc_t **desc;
+    int                *ncol;
+
+    t = mrp_allocz(sizeof(*t));
+
+    if (t != NULL) {
+        mrp_list_init(&t->hook);
+        mrp_list_init(&t->watches);
+
+        t->name = mrp_strdup(name);
+
+        if (t->name == NULL)
+            goto fail;
+
+        cols = &t->columns;
+        ncol = &t->ncolumn;
+        desc = &t->coldesc;
+
+        if (!copy_column_definitions(columns, ncolumn, cols, ncol))
+            goto fail;
+
+        if (!setup_column_descriptors(t->columns, t->ncolumn, desc))
+            goto fail;
+
+        t->h = mqi_get_table_handle(t->name);
+
+        if (!mrp_htbl_insert(pdp->watched, t->name, t))
+            goto fail;
+
+        mrp_list_append(&pdp->tables, &t->hook);
+    }
+
+    return t;
+
+ fail:
+    destroy_watch_table(pdp, t);
+
+    return FALSE;
+}
+
+
+static void destroy_table_watches(pep_table_t *t)
+{
+    pep_watch_t     *w;
+    mrp_list_hook_t *p, *n;
+
+    if (t != NULL) {
+        mrp_list_foreach(&t->watches, p, n) {
+            w = mrp_list_entry(p, typeof(*w), tbl_hook);
+
+            mrp_list_delete(&w->tbl_hook);
+            mrp_list_delete(&w->pep_hook);
+
+            mrp_free(w);
+        }
+    }
+}
+
+
+void destroy_watch_table(pdp_t *pdp, pep_table_t *t)
+{
+    mrp_list_delete(&t->hook);
+    t->h = MQI_HANDLE_INVALID;
+
+    if (pdp != NULL)
+        mrp_htbl_remove(pdp->watched, t->name, FALSE);
+
+    destroy_table_watches(t);
+}
+
+
+static pep_table_t *lookup_watch_table(pdp_t *pdp, const char *name)
+{
+    return mrp_htbl_lookup(pdp->watched, (void *)name);
+}
+
+
+static void purge_watch_table_cb(void *key, void *entry)
+{
+    pep_table_t *t = (pep_table_t *)entry;
+
+    MRP_UNUSED(key);
+
+    destroy_watch_table(NULL, t);
+}
+
+
+int create_proxy_watch(pep_proxy_t *proxy, int id, mrp_pep_table_t *def,
+                       int *error, const char **errmsg)
+{
+    pdp_t       *pdp = proxy->pdp;
+    pep_table_t *t;
+    pep_watch_t *w;
+
+    t = lookup_watch_table(pdp, def->name);
+
+    if (t == NULL) {
+        t = create_watch_table(pdp, def->name, def->columns, def->ncolumn);
+
+        if (t == NULL) {
+            *error  = EINVAL;
+            *errmsg = "failed to watch table";
+        }
+    }
+    else {
+        if (!check_columns(t->columns, t->ncolumn, def->columns, def->ncolumn)){
+            *error  = EINVAL;
+            *errmsg = "table columns don't match";
+            t = NULL;
+        }
+    }
+
+    if (t == NULL)
+        return FALSE;
+
+    w = mrp_allocz(sizeof(*w));
+
+    if (w != NULL) {
+        mrp_list_init(&w->tbl_hook);
+        mrp_list_init(&w->pep_hook);
+
+        w->table = t;
+        w->proxy = proxy;
+        w->id    = id;
+
+        mrp_list_append(&t->watches, &w->tbl_hook);
+        mrp_list_append(&proxy->watches, &w->pep_hook);
+
+        return TRUE;
+    }
+    else {
+        *error  = ENOMEM;
+        *errmsg = "failed to allocate table watch";
+    }
+
+    return FALSE;
+}
+
+
+void destroy_proxy_watches(pep_proxy_t *proxy)
+{
+    pep_watch_t     *w;
+    mrp_list_hook_t *p, *n;
+
+    if (proxy != NULL) {
+        mrp_list_foreach(&proxy->watches, p, n) {
+            w = mrp_list_entry(p, typeof(*w), pep_hook);
+
+            mrp_list_delete(&w->tbl_hook);
+            mrp_list_delete(&w->pep_hook);
+
+            mrp_free(w);
+        }
+    }
+}
+
+
+static void reset_proxy_tables(pep_proxy_t *proxy)
+{
+    int i;
+
+    for (i = 0; i < proxy->ntable; i++)
+        mqi_delete_from(proxy->tables[i].h, NULL);
+}
+
+
+static int insert_into_table(pep_table_t *t, mrp_pep_value_t *values, int nrow)
+{
+    int   i;
+    void *data[2];
+
+    data[1] = NULL;
+
+    for (i = 0; i < nrow; i++) {
+        data[0] = values;
+        if (mqi_insert_into(t->h, 0, t->coldesc, data) != 1)
+            return FALSE;
+        else
+            values += t->ncolumn;
+    }
+
+    return TRUE;
+}
+
+
+int set_proxy_tables(pep_proxy_t *proxy, mrp_pep_data_t *tables, int ntable,
+                     int *error, const char **errmsg)
+{
+    mqi_handle_t    tx;
+    pep_table_t    *t;
+    int             i, id;
+
+    tx = mqi_begin_transaction();
+
+    if (tx != MQI_HANDLE_INVALID) {
+        reset_proxy_tables(proxy);
+
+        for (i = 0; i < ntable; i++) {
+            id = tables[i].id;
+
+            if (id < 0 || id >= proxy->ntable)
+                goto fail;
+
+            t = proxy->tables + id;
+
+            if (tables[i].ncolumn != t->ncolumn)
+                goto fail;
+
+#if 0
+            if (!delete_from_table(t, tables[i].columns, tables[i].nrow))
+                goto fail;
+#endif
+
+            if (!insert_into_table(t, tables[i].columns, tables[i].nrow))
+                goto fail;
+
+
+        }
+
+        mqi_commit_transaction(tx);
+
+        return TRUE;
+
+    fail:
+        *error  = EINVAL;
+        *errmsg = "failed to set tables";
+        mqi_rollback_transaction(tx);
+    }
+
+    return FALSE;
+}
diff --git a/src/plugins/decision-proto/table.h b/src/plugins/decision-proto/table.h
new file mode 100644 (file)
index 0000000..7066462
--- /dev/null
@@ -0,0 +1,32 @@
+#ifndef __MURPHY_DECISION_TABLE_H__
+#define __MURPHY_DECISION_TABLE_H__
+
+#include "client.h"
+#include "decision-types.h"
+
+int copy_pep_table(mrp_pep_table_t *src, mrp_pep_table_t *dst);
+void free_pep_table(mrp_pep_table_t *t);
+
+int copy_pep_tables(mrp_pep_table_t *src, mrp_pep_table_t *dst, int n);
+void free_pep_tables(mrp_pep_table_t *tables, int n);
+
+int init_tables(pdp_t *pdp);
+void destroy_tables(pdp_t *pdp);
+
+int create_proxy_table(pep_table_t *t, mrp_pep_table_t *def,
+                       int *errcode, const char **errmsg);
+
+int create_proxy_watch(pep_proxy_t *proxy, int id, mrp_pep_table_t *def,
+                       int *errcode, const char **errmsg);
+
+void destroy_watch_table(pdp_t *pdp, pep_table_t *t);
+
+void destroy_proxy_table(pep_table_t *t);
+void destroy_proxy_tables(pep_proxy_t *proxy);
+
+void destroy_proxy_watches(pep_proxy_t *proxy);
+
+int set_proxy_tables(pep_proxy_t *proxy, mrp_pep_data_t *tables, int ntable,
+                     int *error, const char **errmsg);
+
+#endif /* __MURPHY_DECISION_TABLE_H__ */
diff --git a/src/plugins/decision-proto/test-client.c b/src/plugins/decision-proto/test-client.c
new file mode 100644 (file)
index 0000000..015f9a6
--- /dev/null
@@ -0,0 +1,1096 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <netdb.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#define _GNU_SOURCE
+#include <getopt.h>
+
+#include <readline/readline.h>
+#include <readline/history.h>
+
+#include <murphy/common.h>
+#include "client.h"
+
+#define DEFAULT_PROMPT "test-pep> "
+
+
+/*
+ * client context
+ */
+
+typedef struct {
+    const char      *addrstr;            /* server address */
+    int              zone;               /* run in zone control mode */
+    int              verbose;            /* verbose mode */
+
+    mrp_mainloop_t  *ml;                 /* murphy mainloop */
+    void            *pep;                /* enforcement point client */
+    int              fd;                 /* fd for terminal input */
+    mrp_io_watch_t  *iow;                /* I/O watch for terminal input */
+} client_t;
+
+
+#define NVALUE 512
+
+
+/*
+ * device and stream definitions
+ */
+
+#define NDEVICE        (MRP_ARRAY_SIZE(devices) - 1)
+#define DEVICE_NCOLUMN 4
+
+typedef struct {
+    const char *name;
+    const char *type;
+    int         public;
+    int         available;
+} device_t;
+
+static device_t devices[] = {
+    { "builtin-speaker" , "speaker"  , TRUE , TRUE  },
+    { "builtin-earpiece", "speaker"  , FALSE, TRUE  },
+    { "usb-speaker"     , "speaker"  , TRUE , FALSE },
+    { "a2dp-speaker"    , "speaker"  , TRUE , FALSE },
+    { "wired-headset"   , "headset"  , FALSE, FALSE },
+    { "usb-headphone"   , "headphone", FALSE, FALSE },
+    { "a2dp-headphone"  , "headphone", FALSE, FALSE },
+    { "sco-headset"     , "headset"  , FALSE, FALSE },
+    { NULL              , NULL       , FALSE, FALSE }
+};
+
+#define NSTREAM        (MRP_ARRAY_SIZE(streams) - 1)
+#define STREAM_NCOLUMN 4
+
+typedef struct {
+    const char *name;
+    const char *role;
+    pid_t       owner;
+    int         playing;
+} stream_t;
+
+static stream_t streams[] = {
+    { "player1", "player"   , 1234, FALSE },
+    { "player2", "player"   , 4321, FALSE },
+    { "navit"  , "navigator", 5432, FALSE },
+    { "phone"  , "call"     , 6666, FALSE },
+    { NULL     , NULL       , 0   , FALSE }
+};
+
+
+/*
+ * device and stream descriptors
+ */
+
+MRP_PEP_TABLE_COLUMNS(device_columns,
+                      MRP_PEP_STRING ("name", 32 , TRUE ),
+                      MRP_PEP_STRING ("type", 32 , FALSE),
+                      MRP_PEP_INTEGER("public"   , FALSE),
+                      MRP_PEP_INTEGER("available", FALSE));
+
+MRP_PEP_TABLE_COLUMNS(stream_columns,
+                      MRP_PEP_STRING  ("name", 32, TRUE ),
+                      MRP_PEP_STRING  ("role", 32, FALSE),
+                      MRP_PEP_UNSIGNED("owner"   , FALSE),
+                      MRP_PEP_INTEGER ("playing" , FALSE));
+
+MRP_PEP_TABLES(media_tables,
+               MRP_PEP_TABLE("devices", device_columns),
+               MRP_PEP_TABLE("streams", stream_columns));
+
+
+/*
+ * zone and call definitions
+ */
+
+#define NZONE        (MRP_ARRAY_SIZE(zones) - 1)
+#define ZONE_NCOLUMN 3
+
+typedef struct {
+    const char *name;
+    int         occupied;
+    int         active;
+} zone_t;
+
+static zone_t zones[] = {
+    { "driver"     , TRUE , FALSE },
+    { "fearer"     , FALSE, TRUE  },
+    { "back-left"  , TRUE , FALSE },
+    { "back-center", FALSE, FALSE },
+    { "back-right" , TRUE , TRUE  },
+    { NULL         , FALSE, FALSE }
+};
+
+
+#define NCALL        (MRP_ARRAY_SIZE(calls) - 1)
+#define CALL_NCOLUMN 3
+
+typedef struct {
+    int         id;
+    const char *state;
+    const char *modem;
+} call_t;
+
+static call_t calls[] = {
+    { 1, "active"  , "modem1" },
+    { 2, "ringing" , "modem1" },
+    { 3, "held"    , "modem2" },
+    { 4, "alerting", "modem2" },
+    { 0, NULL      , NULL     }
+};
+
+
+/*
+ * zone and call descriptors
+ */
+
+MRP_PEP_TABLE_COLUMNS(zone_columns,
+                      MRP_PEP_STRING ("name", 32, TRUE),
+                      MRP_PEP_INTEGER("occupied", FALSE),
+                      MRP_PEP_INTEGER("active"  , FALSE));
+
+MRP_PEP_TABLE_COLUMNS(call_columns,
+                      MRP_PEP_INTEGER("id"       , TRUE ),
+                      MRP_PEP_STRING ("state", 32, FALSE),
+                      MRP_PEP_STRING ("modem", 32, FALSE));
+
+MRP_PEP_TABLES(zone_tables,
+               MRP_PEP_TABLE("zones", zone_columns),
+               MRP_PEP_TABLE("calls", call_columns));
+
+
+
+mrp_pep_table_t *exports;
+int              nexport;
+mrp_pep_table_t *imports;
+int              nimport;
+
+
+static client_t *client;
+
+
+
+static void fatal_msg(int error, const char *format, ...);
+static void error_msg(const char *format, ...);
+static void info_msg(const char *format, ...);
+
+static void terminal_input_cb(char *input);
+
+static void export_data(client_t *c);
+
+
+static void plug_device(client_t *c, const char *name, int plug)
+{
+    device_t *d;
+    int       changed;
+
+    if (c->zone) {
+        error_msg("cannot plug/unplug, client is in zone mode");
+        return;
+    }
+
+    changed = FALSE;
+
+    for (d = devices; d->name != NULL; d++) {
+        if (!strcmp(d->name, name)) {
+            changed = plug ^ d->available;
+            d->available = plug;
+            break;
+        }
+    }
+
+    if (changed) {
+        info_msg("device '%s' is now %splugged", d->name, plug ? "" : "un");
+        export_data(c);
+    }
+}
+
+
+static void list_devices(void)
+{
+    device_t *d;
+    int       n;
+
+    for (d = devices, n = 0; d->name != NULL; d++, n++) {
+        info_msg("device '%s': (%s, %s), %s",
+                 d->name, d->type, d->public ? "public" : "private",
+                 d->available ? "available" : "currently unplugged");
+    }
+
+    if (n == 0)
+        info_msg("devices: none");
+}
+
+
+static void play_stream(client_t *c, const char *name, int play)
+{
+    stream_t *s;
+    int       changed;
+
+    if (c->zone) {
+        error_msg("cannot control streams, client is in zone mode");
+        return;
+    }
+
+    changed = FALSE;
+
+    for (s = streams; s->name != NULL; s++) {
+        if (!strcmp(s->name, name)) {
+            changed = play ^ s->playing;
+            s->playing = play;
+            break;
+        }
+    }
+
+    if (changed) {
+        info_msg("stream '%s' is now %s", s->name, play ? "playing":"stopped");
+        export_data(c);
+    }
+}
+
+
+static void list_streams(void)
+{
+    stream_t *s;
+    int       n;
+
+    for (s = streams, n = 0; s->name != NULL; s++, n++) {
+        info_msg("stream '%s': role %s, owner %u, currently %splaying",
+                 s->name, s->role, s->owner, s->playing ? "" : "not ");
+    }
+
+    if (n == 0)
+        info_msg("streams: none");
+}
+
+
+static void set_zone_state(client_t *c, char *config)
+{
+    zone_t *z;
+    int     occupied, active, changed, len;
+    char    name[256], *end;
+
+    if (!c->zone) {
+        error_msg("cannot control zones, client is not in zone mode");
+        return;
+    }
+
+    while (*config == ' ' || *config == '\t')
+        config++;
+
+    end = strchr(config, ' ');
+    if (end == NULL)
+        return;
+
+    len = end - config;
+    strncpy(name, config, len);
+    name[len] = '\0';
+
+    config = end + 1;
+    while (*config == ' ' || *config == '\t')
+        config++;
+
+    occupied = FALSE;
+    active   = FALSE;
+    changed  = FALSE;
+
+    if (strstr(config, "occupied"))
+        occupied = TRUE;
+    if (strstr(config, "active"))
+        active = TRUE;
+
+    for (z = zones; z->name != NULL; z++) {
+        if (!strcmp(z->name, name)) {
+            changed     = (active & z->active) | (occupied ^ z->occupied);
+            z->active   = active;
+            z->occupied = occupied;
+            break;
+        }
+    }
+
+    if (changed) {
+        info_msg("zone '%s' is now %s and %s", z->name,
+                 z->occupied ? "occupied" : "free",
+                 z->active   ? "actvie"   : "idle");
+        export_data(c);
+    }
+}
+
+
+static void list_zones(void)
+{
+    zone_t *z;
+    int     n;
+
+    for (z = zones, n = 0; z->name != NULL; z++, n++) {
+        info_msg("zone '%s' is now %s and %s", z->name,
+                 z->occupied ? "occupied" : "free",
+                 z->active   ? "actvie"   : "idle");
+    }
+
+    if (n == 0)
+        info_msg("zones: none");
+}
+
+
+static void set_call_state(client_t *c, const char *config)
+{
+    call_t *call;
+    char   idstr[64], *state, *end;
+    int     id, changed, len;
+
+    if (!c->zone) {
+        error_msg("cannot control calls, client is not in zone mode");
+        return;
+    }
+
+    while (*config == ' ' || *config == '\t')
+        config++;
+
+    end = strchr(config, ' ');
+    if (end == NULL)
+        return;
+
+    len = end - config;
+    strncpy(idstr, config, len);
+    idstr[len] = '\0';
+
+    config = end + 1;
+    while (*config == ' ' || *config == '\t')
+        config++;
+    state = (char *)config;
+
+    id = strtoul(idstr, &end, 10);
+
+    if (end && *end) {
+        error_msg("invalid call id '%s'", idstr);
+        return;
+    }
+
+    for (call = calls; call->id > 0; call++) {
+        if (call->id == id) {
+            if (strcmp(call->state, state)) {
+                mrp_free((char *)call->state);
+                call->state = mrp_strdup(state);
+                changed     = TRUE;
+                break;
+            }
+        }
+    }
+
+    if (changed) {
+        info_msg("call #%d is now %s", call->id, call->state);
+        export_data(c);
+    }
+}
+
+
+static void list_calls(void)
+{
+    call_t *c;
+    int     n;
+
+    for (c = calls, n = 0; c->id > 0; c++, n++) {
+        info_msg("call #%d: %s (on modem %s)", c->id, c->state, c->modem);
+    }
+
+    if (n == 0)
+        info_msg("calls: none");
+}
+
+
+static void reset_devices(void)
+{
+    mrp_clear(&devices);
+}
+
+
+void update_devices(mrp_pep_data_t *data)
+{
+    device_t        *d;
+    mrp_pep_value_t *v;
+    int              i;
+
+    if (data->ncolumn != DEVICE_NCOLUMN) {
+        error_msg("incorrect number of columns (%d) in device update",
+                  data->ncolumn);
+        return;
+    }
+
+    if (data->nrow > (int)NDEVICE) {
+        error_msg("too many rows (%d) in device update", data->nrow);
+        return;
+    }
+
+    if (data->nrow == 0)
+        reset_devices();
+    else {
+        v = data->columns;
+        d = devices;
+
+        for (i = 0; i < data->nrow; i++) {
+            mrp_free((char *)d->name);
+            mrp_free((char *)d->type);
+
+            d->name      = mrp_strdup(v[0].str);
+            d->type      = mrp_strdup(v[1].str);
+            d->public    = v[2].s32;
+            d->available = v[3].s32;
+
+            v += 4;
+            d += 1;
+        }
+    }
+
+    list_devices();
+}
+
+
+static void reset_streams(void)
+{
+    mrp_clear(&streams);
+}
+
+
+void update_streams(mrp_pep_data_t *data)
+{
+    stream_t        *s;
+    mrp_pep_value_t *v;
+    int              i;
+
+    if (data->ncolumn != STREAM_NCOLUMN) {
+        error_msg("incorrect number of columns (%d) in stream update",
+                  data->ncolumn);
+        return;
+    }
+
+    if (data->nrow > (int)NSTREAM) {
+        error_msg("too many rows (%d) in stream update", data->nrow);
+        return;
+    }
+
+    if (data->nrow == 0)
+        reset_streams();
+    else {
+        v = data->columns;
+        s = streams;
+
+        for (i = 0; i < data->nrow; i++) {
+            mrp_free((char *)s->name);
+            mrp_free((char *)s->role);
+
+            s->name    = mrp_strdup(v[0].str);
+            s->role    = mrp_strdup(v[1].str);
+            s->owner   = v[2].u32;
+            s->playing = v[3].s32;
+
+            v += 4;
+            s += 1;
+        }
+    }
+
+    list_streams();
+}
+
+static void reset_zones(void)
+{
+    mrp_clear(&zones);
+}
+
+
+void update_zones(mrp_pep_data_t *data)
+{
+    zone_t          *z;
+    mrp_pep_value_t *v;
+    int              i;
+
+    if (data->ncolumn != ZONE_NCOLUMN) {
+        error_msg("incorrect number of columns (%d) in zone update",
+                  data->ncolumn);
+        return;
+    }
+
+    if (data->nrow > (int)NZONE) {
+        error_msg("too many rows (%d) in zone update", data->nrow);
+        return;
+    }
+
+    if (data->nrow == 0)
+        reset_zones();
+    else {
+
+        v = data->columns;
+        z = zones;
+
+        for (i = 0; i < data->nrow; i++) {
+            mrp_free((char *)z->name);
+
+            z->name     = mrp_strdup(v[0].str);
+            z->occupied = v[1].s32;
+            z->active   = v[2].s32;
+
+            v += 3;
+            z += 1;
+        }
+    }
+
+    list_zones();
+}
+
+
+static void reset_calls(void)
+{
+    mrp_clear(&calls);
+}
+
+
+void update_calls(mrp_pep_data_t *data)
+{
+    call_t          *c;
+    mrp_pep_value_t *v;
+    int              i;
+
+    if (data->ncolumn != CALL_NCOLUMN) {
+        error_msg("incorrect number of columns (%d) in call update.",
+               data->ncolumn);
+        return;
+    }
+
+    if (data->nrow > (int)NCALL) {
+        error_msg("too many rows (%d) in call update", data->nrow);
+        return;
+    }
+
+    if (data->nrow == 0)
+        reset_calls();
+    else {
+        v = data->columns;
+        c = calls;
+
+        for (i = 0; i < data->nrow; i++) {
+            mrp_free((char *)c->state);
+            mrp_free((char *)c->modem);
+
+            c->id    = v[0].s32;
+            c->state = mrp_strdup(v[1].str);
+            c->modem = mrp_strdup(v[2].str);
+
+            v += 3;
+            c += 1;
+        }
+    }
+
+    list_calls();
+}
+
+
+void update_imports(client_t *c, mrp_pep_data_t *data, int ntable)
+{
+    int i;
+
+    for (i = 0; i < ntable; i++) {
+        if (c->zone) {
+            if (data[i].id == 0)
+                update_devices(data + i);
+            else
+                update_streams(data + i);
+        }
+        else {
+            if (data[i].id == 0)
+                update_zones(data + i);
+            else
+                update_calls(data + i);
+        }
+    }
+}
+
+
+
+static void terminal_prompt_erase(void)
+{
+    int n = strlen(DEFAULT_PROMPT);
+
+    printf("\r");
+    while (n-- > 0)
+        printf(" ");
+    printf("\r");
+}
+
+
+static void terminal_prompt_display(void)
+{
+    rl_callback_handler_remove();
+    rl_callback_handler_install(DEFAULT_PROMPT, terminal_input_cb);
+}
+
+
+static void show_help(void)
+{
+#define P info_msg
+
+    P("Available commands:");
+    P("  help                                  show this help");
+    P("  list                                  list all data");
+    P("  list {devices|streams|zones|calls}    list the requested data");
+    P("  plug <device>                         update <device> as plugged");
+    P("  unplug <device>                       update <device> as unplugged");
+    P("  play <stream>                         update <stream> as playing");
+    P("  stop <stream>                         update <stream> as stopped");
+    P("  call <call> <state>                   update state of <call>");
+    P("  zone <zone> [occupied,[active]]       update state of <zone>");
+
+#undef P
+}
+
+
+static void terminal_process_input(char *input)
+{
+    int len;
+
+    add_history(input);
+
+    if (input == NULL || !strcmp(input, "exit")) {
+        terminal_prompt_erase();
+        exit(0);
+    }
+    else if (!strcmp(input, "help")) {
+        show_help();
+    }
+    else if (!strcmp(input, "list")) {
+        list_devices();
+        list_streams();
+        list_zones();
+        list_calls();
+    }
+    else if (!strcmp(input, "list devices"))
+        list_devices();
+    else if (!strcmp(input, "list streams"))
+        list_streams();
+    else if (!strcmp(input, "list zones"))
+        list_zones();
+    else if (!strcmp(input, "list calls"))
+        list_calls();
+    else if (!strncmp(input, "plug "  , len=sizeof("plug ")   - 1) ||
+             !strncmp(input, "unplug ", len=sizeof("unplug ") - 1)) {
+        plug_device(client, input + len, *input == 'p');
+    }
+    else if (!strncmp(input, "play "  , len=sizeof("play ")   - 1) ||
+             !strncmp(input, "stop ", len=sizeof("stop ") - 1)) {
+        play_stream(client, input + len, *input == 'p');
+    }
+    else if (!strncmp(input, "call "  , len=sizeof("call ")   - 1)) {
+        set_call_state(client, input + len);
+    }
+    else if (!strncmp(input, "zone "  , len=sizeof("zone ")   - 1)) {
+        set_zone_state(client, input + len);
+    }
+}
+
+
+static void terminal_input_cb(char *input)
+{
+    terminal_process_input(input);
+    free(input);
+}
+
+
+static void terminal_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd,
+                        mrp_io_event_t events, void *user_data)
+{
+    MRP_UNUSED(w);
+    MRP_UNUSED(fd);
+    MRP_UNUSED(user_data);
+
+    if (events & MRP_IO_EVENT_IN)
+        rl_callback_read_char();
+
+    if (events & MRP_IO_EVENT_HUP)
+        mrp_mainloop_quit(ml, 0);
+}
+
+
+static void terminal_setup(client_t *c)
+{
+    mrp_io_event_t events;
+
+    c->fd  = fileno(stdin);
+    events = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP;
+    c->iow = mrp_add_io_watch(c->ml, c->fd, events, terminal_cb, c);
+
+    if (c->iow == NULL)
+        fatal_msg(1, "Failed to create terminal input I/O watch.");
+    else
+        terminal_prompt_display();
+}
+
+
+static void terminal_cleanup(client_t *c)
+{
+    mrp_del_io_watch(c->iow);
+    c->iow = NULL;
+
+    rl_callback_handler_remove();
+}
+
+
+static void fatal_msg(int error, const char *format, ...)
+{
+    va_list ap;
+
+    terminal_prompt_erase();
+
+    fprintf(stderr, "fatal error: ");
+    va_start(ap, format);
+    vfprintf(stderr, format, ap);
+    va_end(ap);
+    fprintf(stderr, "\n");
+    fflush(stderr);
+
+    exit(error);
+}
+
+
+static void error_msg(const char *format, ...)
+{
+    va_list ap;
+
+    terminal_prompt_erase();
+
+    fprintf(stderr, "error: ");
+    va_start(ap, format);
+    vfprintf(stderr, format, ap);
+    va_end(ap);
+    fprintf(stderr, "\n");
+    fflush(stderr);
+
+    terminal_prompt_display();
+}
+
+
+static void info_msg(const char *format, ...)
+{
+    va_list ap;
+
+    terminal_prompt_erase();
+
+    va_start(ap, format);
+    vfprintf(stdout, format, ap);
+    va_end(ap);
+    fprintf(stdout, "\n");
+    fflush(stdout);
+
+    terminal_prompt_display();
+}
+
+
+static void signal_handler(mrp_mainloop_t *ml, mrp_sighandler_t *h,
+                           int signum, void *user_data)
+{
+    MRP_UNUSED(h);
+    MRP_UNUSED(user_data);
+
+    switch (signum) {
+    case SIGINT:
+        info_msg("Got SIGINT, stopping...");
+        mrp_mainloop_quit(ml, 0);
+        break;
+    }
+}
+
+
+static void connect_notify(mrp_pep_t *pep, int connected, int errcode,
+                           const char *errmsg, void *user_data)
+{
+    MRP_UNUSED(pep);
+    MRP_UNUSED(user_data);
+
+    if (connected) {
+        info_msg("Successfully registered to server.");
+        export_data(client);
+    }
+    else
+        error_msg("No connection to server (%d: %s).", errcode, errmsg);
+}
+
+
+static void data_notify(mrp_pep_t *pep, mrp_pep_data_t *tables,
+                        int ntable, void *user_data)
+{
+    client_t *client = (client_t *)user_data;
+
+    MRP_UNUSED(pep);
+
+    update_imports(client, tables, ntable);
+}
+
+
+static void export_notify(mrp_pep_t *pep, int errcode, const char *errmsg,
+                          void *user_data)
+{
+    MRP_UNUSED(pep);
+    MRP_UNUSED(user_data);
+
+    if (errcode != 0) {
+        error_msg("Data set request failed (%d: %s).", errcode, errmsg);
+    }
+}
+
+
+static void export_data(client_t *c)
+{
+    mrp_pep_value_t values[NVALUE], *col, *val;
+    mrp_pep_data_t  tables[2];
+    int             i;
+
+    val = values;
+    col = val;
+
+    if (!c->zone) {
+        tables[0].id      = 0;
+        tables[0].nrow    = NDEVICE;
+        tables[0].columns = col;
+
+        for (i = 0; i < (int)NDEVICE; i++) {
+            col[0].str = devices[i].name;
+            col[1].str = devices[i].type;
+            col[2].s32 = devices[i].public;
+            col[3].s32 = devices[i].available;
+            col += 4;
+        }
+
+        tables[1].id      = 1;
+        tables[1].nrow    = NSTREAM;
+        tables[1].columns = col;
+
+        for (i = 0; i < (int)NSTREAM; i++) {
+            col[0].str = streams[i].name;
+            col[1].str = streams[i].role;
+            col[2].u32 = streams[i].owner;
+            col[3].s32 = streams[i].playing;
+            col += 4;
+        }
+    }
+    else {
+        tables[0].id      = 0;
+        tables[0].nrow    = NZONE;
+        tables[0].columns = col;
+
+        for (i = 0; i < (int)NZONE; i++) {
+            col[0].str = zones[i].name;
+            col[1].s32 = zones[i].occupied;
+            col[2].s32 = zones[i].active;
+            col += 3;
+        }
+
+        tables[1].id      = 1;
+        tables[1].nrow    = NCALL;
+        tables[1].columns = col;
+
+        for (i = 0; i < (int)NCALL; i++) {
+            col[0].s32 = calls[i].id;
+            col[1].str = calls[i].state;
+            col[2].str = calls[i].modem;
+            col += 3;
+        }
+    }
+
+    if (!mrp_pep_set_data(c->pep, tables, MRP_ARRAY_SIZE(tables),
+                          export_notify, c))
+        error_msg("Failed to send data set request to server.");
+}
+
+
+static void client_setup(client_t *c)
+{
+    mrp_mainloop_t *ml;
+    mrp_pep_t      *pep;
+
+    ml = mrp_mainloop_create();
+
+    if (ml != NULL) {
+        if (!c->zone) {
+            exports = media_tables;
+            nexport = MRP_ARRAY_SIZE(media_tables);
+            imports = zone_tables;
+            nimport = MRP_ARRAY_SIZE(zone_tables);
+        }
+        else {
+            exports = zone_tables;
+            nexport = MRP_ARRAY_SIZE(zone_tables);
+            imports = media_tables;
+            nimport = MRP_ARRAY_SIZE(media_tables);
+        }
+
+        pep = mrp_pep_create(c->zone ? "zone-pep" : "media-pep", ml,
+                             exports, nexport, imports, nimport,
+                             connect_notify, data_notify, c);
+
+        if (pep != NULL) {
+            c->ml  = ml;
+            c->pep = pep;
+
+            mrp_add_sighandler(ml, SIGINT, signal_handler, c);
+
+            if (c->zone) {
+                zone_t *z;
+                call_t *call;
+
+                for (z = zones; z->name != NULL; z++) {
+                    z->name = mrp_strdup(z->name);
+                }
+
+                for (call = calls; call->id > 0; call++) {
+                    call->state = mrp_strdup(call->state);
+                    call->modem = mrp_strdup(call->modem);
+                }
+
+                reset_devices();
+                reset_streams();
+            }
+            else {
+                device_t *d;
+                stream_t *s;
+
+                for (d = devices; d->name != NULL; d++) {
+                    d->name = mrp_strdup(d->name);
+                    d->type = mrp_strdup(d->type);
+                }
+
+                for (s = streams; s->name != NULL; s++) {
+                    s->name = mrp_strdup(s->name);
+                    s->role = mrp_strdup(s->role);
+                }
+
+                reset_zones();
+                reset_calls();
+            }
+        }
+        else
+            fatal_msg(1, "Failed to create enforcement point.");
+    }
+    else
+        fatal_msg(1, "Failed to create mainloop.");
+}
+
+
+static void client_cleanup(client_t *c)
+{
+    mrp_mainloop_destroy(c->ml);
+    mrp_pep_destroy(c->pep);
+
+    c->ml  = NULL;
+    c->pep = NULL;
+}
+
+
+static void client_run(client_t *c)
+{
+    if (mrp_pep_connect(c->pep, c->addrstr))
+        info_msg("Connected to server at %s.", c->addrstr);
+    else
+        error_msg("Failed to connect to server at %s.", c->addrstr);
+
+    mrp_mainloop_run(c->ml);
+}
+
+
+static void print_usage(const char *argv0, int exit_code, const char *fmt, ...)
+{
+    va_list ap;
+
+    if (fmt && *fmt) {
+        va_start(ap, fmt);
+        vprintf(fmt, ap);
+        va_end(ap);
+    }
+
+    printf("usage: %s [options]\n\n"
+           "The possible options are:\n"
+           "  -s, --server <address>     connect to murphy at given address\n"
+           "  -z, --zone                 run as zone controller\n"
+           "  -v, --verbose              run in verbose mode\n"
+           "  -h, --help                 show this help on usage\n",
+           argv0);
+
+    if (exit_code < 0)
+        return;
+    else
+        exit(exit_code);
+}
+
+
+static void client_set_defaults(client_t *c)
+{
+    mrp_clear(c);
+    c->addrstr = MRP_DEFAULT_PEP_ADDRESS;
+    c->zone    = FALSE;
+    c->verbose = FALSE;
+}
+
+
+int parse_cmdline(client_t *c, int argc, char **argv)
+{
+#   define OPTIONS "vzhs:"
+    struct option options[] = {
+        { "server"    , required_argument, NULL, 's' },
+        { "zone"      , no_argument      , NULL, 'z' },
+        { "verbose"   , optional_argument, NULL, 'v' },
+        { "help"      , no_argument      , NULL, 'h' },
+        { NULL, 0, NULL, 0 }
+    };
+
+    int opt;
+
+    client_set_defaults(c);
+
+    while ((opt = getopt_long(argc, argv, OPTIONS, options, NULL)) != -1) {
+        switch (opt) {
+        case 'z':
+            c->zone = TRUE;
+            break;
+
+        case 'v':
+            c->verbose = TRUE;
+            break;
+
+        case 'a':
+            c->addrstr = optarg;
+            break;
+
+        case 'h':
+            print_usage(argv[0], -1, "");
+            exit(0);
+            break;
+
+        default:
+            print_usage(argv[0], EINVAL, "invalid option '%c'", opt);
+        }
+    }
+
+    return TRUE;
+}
+
+
+int main(int argc, char *argv[])
+{
+    client_t  c;
+
+    client_set_defaults(&c);
+    parse_cmdline(&c, argc, argv);
+
+    client_setup(&c);
+    terminal_setup(&c);
+
+    client = &c;
+    client_run(&c);
+
+    terminal_cleanup(&c);
+    client_cleanup(&c);
+
+    return 0;
+}