domain-control: added domain-control plugin (modified decision-proto).
authorKrisztian Litkey <krisztian.litkey@intel.com>
Mon, 22 Oct 2012 10:07:00 +0000 (13:07 +0300)
committerKrisztian Litkey <krisztian.litkey@intel.com>
Tue, 30 Oct 2012 11:52:47 +0000 (13:52 +0200)
20 files changed:
configure.ac
src/Makefile.am
src/daemon/murphy.conf
src/plugins/domain-control/Makefile [new file with mode: 0644]
src/plugins/domain-control/client.c [new file with mode: 0644]
src/plugins/domain-control/client.h [new file with mode: 0644]
src/plugins/domain-control/domain-control-types.h [new file with mode: 0644]
src/plugins/domain-control/domain-control.c [new file with mode: 0644]
src/plugins/domain-control/domain-control.h [new file with mode: 0644]
src/plugins/domain-control/message.c [new file with mode: 0644]
src/plugins/domain-control/message.h [new file with mode: 0644]
src/plugins/domain-control/notify.c [new file with mode: 0644]
src/plugins/domain-control/notify.h [new file with mode: 0644]
src/plugins/domain-control/plugin-domain-control.c [new file with mode: 0644]
src/plugins/domain-control/proxy.c [new file with mode: 0644]
src/plugins/domain-control/proxy.h [new file with mode: 0644]
src/plugins/domain-control/table-common.c [new file with mode: 0644]
src/plugins/domain-control/table.c [new file with mode: 0644]
src/plugins/domain-control/table.h [new file with mode: 0644]
src/plugins/domain-control/test-client.c [new file with mode: 0644]

index eb3baa7..986812f 100644 (file)
@@ -342,6 +342,8 @@ AM_CONDITIONAL(DISABLED_PLUGIN_CONSOLE,  [check_if_disabled console])
 AM_CONDITIONAL(DISABLED_PLUGIN_SIGNALLING, [check_if_disabled signalling])
 AM_CONDITIONAL(DISABLED_PLUGIN_DECISION, [check_if_disabled decision])
 AM_CONDITIONAL(DISABLED_PLUGIN_RESOURCE_DBUS, [check_if_disabled resource-dbus])
+AM_CONDITIONAL(DISABLED_PLUGIN_DOMAIN_CONTROL,
+               [check_if_disabled domain-control])
 
 AM_CONDITIONAL(BUILTIN_PLUGIN_TEST,     [check_if_internal test])
 AM_CONDITIONAL(BUILTIN_PLUGIN_DBUS,     [check_if_internal dbus])
@@ -350,6 +352,8 @@ AM_CONDITIONAL(BUILTIN_PLUGIN_CONSOLE,  [check_if_internal console])
 AM_CONDITIONAL(BUILTIN_PLUGIN_SIGNALLING, [check_if_internal signalling])
 AM_CONDITIONAL(BUILTIN_PLUGIN_DECISION, [check_if_internal decision])
 AM_CONDITIONAL(BUILTIN_PLUGIN_RESOURCE_DBUS, [check_if_internal resource-dbus])
+AM_CONDITIONAL(BUILTIN_PLUGIN_DOMAIN_CONTROL,
+               [check_if_internal domain-control])
 
 # Check for Check (unit test framework).
 PKG_CHECK_MODULES(CHECK, 
index 026d162..db808a5 100644 (file)
@@ -794,6 +794,91 @@ test_client_LDADD   = libmurphy-pep.la    \
                      -lreadline
 endif
 
+# domain control plugin
+DOMAIN_CONTROL_PLUGIN_SOURCES = plugins/domain-control/plugin-domain-control.c \
+                               plugins/domain-control/domain-control.c        \
+                               plugins/domain-control/proxy.c                 \
+                               plugins/domain-control/table.c                 \
+                               plugins/domain-control/message.c               \
+                               plugins/domain-control/notify.c
+
+DOMAIN_CONTROL_PLUGIN_CFLAGS  =
+DOMAIN_CONTROL_PLUGIN_LIBS    =
+
+if !DISABLED_PLUGIN_DOMAIN_CONTROL
+if BUILTIN_PLUGIN_DOMAIN_CONTROL
+LINKEDIN_PLUGINS             += libmurphy-plugin-domain-control.la
+lib_LTLIBRARIES              += libmurphy-plugin-domain-control.la
+DOMAIN_CONTROL_PLUGIN_LOADER  = linkedin-domain-control-loader.c
+DOMAIN_CONTROL_PLUGIN_CFLAGS += $(BUILTIN_CFLAGS)
+
+
+libmurphy_plugin_domain_control_ladir =                                 \
+               $(includedir)/murphy/domain-control
+
+libmurphy_plugin_domain_control_la_SOURCES =                    \
+               $(DOMAIN_CONTROL_PLUGIN_SOURCES)                 \
+               $(DOMAIN_CONTROL_PLUGIN_LOADER)
+
+libmurphy_plugin_domain_control_la_CFLAGS =                     \
+               $(DOMAIN_CONTROL_PLUGIN_CFLAGS)                  \
+               $(AM_CFLAGS)
+
+libmurphy_plugin_domain_control_la_LDFLAGS =                    \
+               -Wl,-version-script=linker-script.domain-control \
+               -version-info @MURPHY_VERSION_INFO@
+
+libmurphy_plugin_domain_control_la_LIBADD =            \
+               libmurphy-core.la                       \
+               libmurphy-common.la                     \
+               murphy-db/mql/libmql.la                 \
+               murphy-db/mqi/libmqi.la                 \
+               murphy-db/mdb/libmdb.la
+
+libmurphy_plugin_domain_control_la_DEPENDENCIES =      \
+               linker-script.domain-control            \
+               libmurphy-core.la                       \
+               libmurphy-common.la                     \
+               murphy-db/mql/libmql.la                 \
+               murphy-db/mqi/libmqi.la                 \
+               murphy-db/mdb/libmdb.la
+
+# linkedin domain control plugin linker script generation
+linker-script.domain-control: $(DOMAIN_CONTROL_PLUGIN_LOADER:%.c=%.h)
+       $(QUIET_GEN)$(top_builddir)/build-aux/gen-linker-script -q -o $@ $^
+
+clean-linker-script::
+       -rm -f linker-script.domain-control
+else
+plugin_domain_control_la_SOURCES = $(DOMAIN_CONTROL_PLUGIN_SOURCES)
+plugin_domain_control_la_CFLAGS  = $(DOMAIN_CONTROL_PLUGIN_CFLAGS) \
+                                  $(MURPHY_CFLAGS) $(AM_CFLAGS)
+plugin_domain_control_la_LDFLAGS = -module -avoid-version
+plugin_domain_control_la_LIBADD  = $(DOMAIN_CONTROL_PLUGIN_LIBS)
+plugin_LTLIBRARIES              += plugin-domain-control.la
+endif
+
+# domain controller client library
+lib_LTLIBRARIES                        += libmurphy-domain-controller.la
+libmurphy_domain_controller_la_SOURCES  = plugins/domain-control/client.c      \
+                                         plugins/domain-control/table-common.c\
+                                         plugins/domain-control/message.c
+libmurphy_domain_controller_la_CFLAGS   =
+libmurphy_domain_controller_la_LIBADD   = libmurphy-common.la     \
+                                         murphy-db/mql/libmql.la \
+                                         murphy-db/mqi/libmqi.la \
+                                         murphy-db/mdb/libmdb.la
+
+# test domain controller
+bin_PROGRAMS += test-domain-controller
+
+test_domain_controller_SOURCES = plugins/domain-control/test-client.c
+test_domain_controller_CFLAGS  = $(AM_CFLAGS)
+test_domain_controller_LDADD   = libmurphy-domain-controller.la    \
+                                libmurphy-common.la \
+                                -lreadline
+endif
+
 ###################################
 # murphy daemon
 #
index 2499036..46fd7d6 100644 (file)
@@ -32,21 +32,27 @@ if plugin-exists murphydb
 end
 
 # load the native resource plugin if it exists
-if plugin-exists resource-native
-    load-plugin resource-native
-else
-    info "Could not find resource-native plugin"
-end
+#if plugin-exists resource-native
+#    load-plugin resource-native
+#else
+#    info "Could not find resource-native plugin"
+#end
 
 # load the enforcement point interface prototype plugin if it exists
-if plugin-exists decision-proto
-    load-plugin decision-proto
-else
-    info "There is no decision-proto plugin available..."
-end
+#if plugin-exists decision-proto
+#    load-plugin decision-proto
+#else
+#    info "There is no decision-proto plugin available..."
+#end
 
-if plugin-exists resource-dbus
-    load-plugin resource-dbus
+#if plugin-exists resource-dbus
+#    try-load-plugin resource-dbus
+#end
+
+if plugin-exists domain-control
+    load-plugin domain-control
+else
+    info "No domain-control plugin found..."
 end
 
 #set resolver-ruleset '/u/src/work/murphy/src/resolver/test-input'
diff --git a/src/plugins/domain-control/Makefile b/src/plugins/domain-control/Makefile
new file mode 100644 (file)
index 0000000..2c0a593
--- /dev/null
@@ -0,0 +1,7 @@
+ifneq ($(strip $(MAKECMDGOALS)),)
+%:
+       $(MAKE) -C .. $(MAKECMDGOALS)
+else
+all:
+       $(MAKE) -C .. all
+endif
diff --git a/src/plugins/domain-control/client.c b/src/plugins/domain-control/client.c
new file mode 100644 (file)
index 0000000..c9bbd8d
--- /dev/null
@@ -0,0 +1,540 @@
+#include <errno.h>
+#include <alloca.h>
+
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+#include <murphy/common/mainloop.h>
+#include <murphy/common/transport.h>
+
+#include "domain-control-types.h"
+#include "table.h"
+#include "message.h"
+#include "client.h"
+
+
+/*
+ * mark an enforcement point busy (typically while executing a callback)
+ */
+
+#define DOMCTL_MARK_BUSY(dc, ...) do {             \
+        (dc)->busy++;                              \
+        __VA_ARGS__                                \
+        (dc)->busy--;                              \
+        check_destroyed(dc);                       \
+    } while (0)
+
+
+/*
+ * a pending request
+ */
+
+typedef struct {
+    mrp_list_hook_t         hook;        /* hook to pending request queue */
+    uint32_t                seqno;       /* sequence number/request id */
+    mrp_domctl_status_cb_t  cb;          /* callback to call upon completion */
+    void                   *user_data;   /* opaque callback data */
+} pending_request_t;
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
+static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
+                        mrp_sockaddr_t *addr, socklen_t addrlen,
+                        void *user_data);
+static void closed_cb(mrp_transport_t *t, int error, void *user_data);
+
+
+static int queue_pending(mrp_domctl_t *dc, uint32_t seq,
+                         mrp_domctl_status_cb_t cb, void *user_data);
+static int notify_pending(mrp_domctl_t *dc, uint32_t seq, int error,
+                          const char *msg);
+static void purge_pending(mrp_domctl_t *dc);
+
+
+
+
+mrp_domctl_t *mrp_domctl_create(const char *name, mrp_mainloop_t *ml,
+                                mrp_domctl_table_t *tables, int ntable,
+                                mrp_domctl_watch_t *watches, int nwatch,
+                                mrp_domctl_connect_cb_t connect_cb,
+                                mrp_domctl_watch_cb_t watch_cb, void *user_data)
+{
+    mrp_domctl_t       *dc;
+    mrp_domctl_table_t *st, *dt;
+    mrp_domctl_watch_t *sw, *dw;
+    int              i;
+
+    dc = mrp_allocz(sizeof(*dc));
+
+    if (dc != NULL) {
+        mrp_list_init(&dc->pending);
+        dc->ml = ml;
+
+        dc->name    = mrp_strdup(name);
+        dc->tables  = mrp_allocz_array(typeof(*dc->tables) , ntable);
+        dc->watches = mrp_allocz_array(typeof(*dc->watches), nwatch);
+
+        if (dc->name != NULL && dc->tables != NULL && dc->watches != NULL) {
+            for (i = 0; i < ntable; i++) {
+                st = tables + i;
+                dt = dc->tables + i;
+
+                dt->table       = mrp_strdup(st->table);
+                dt->mql_columns = mrp_strdup(st->mql_columns);
+                dt->mql_index   = mrp_strdup(st->mql_index ? st->mql_index:"");
+
+                if (!dt->table || !dt->mql_columns || !dt->mql_index)
+                    break;
+
+                dc->ntable++;
+            }
+
+            for (i = 0; i < nwatch; i++) {
+                sw = watches + i;
+                dw = dc->watches + i;
+
+                dw->table       = mrp_strdup(sw->table);
+                dw->mql_columns = mrp_strdup(sw->mql_columns);
+                dw->mql_where   = mrp_strdup(sw->mql_where ? sw->mql_where:"");
+                dw->max_rows    = sw->max_rows;
+
+                if (!dw->table || !dw->mql_columns || !dw->mql_where)
+                    break;
+
+                dc->nwatch++;
+            }
+
+            dc->connect_cb = connect_cb;
+            dc->watch_cb   = watch_cb;
+            dc->user_data  = user_data;
+            dc->seqno      = 1;
+
+            return dc;
+        }
+
+        mrp_domctl_destroy(dc);
+    }
+
+    return NULL;
+}
+
+
+static void destroy_domctl(mrp_domctl_t *dc)
+{
+    int i;
+
+    mrp_free(dc->name);
+
+    for (i = 0; i < dc->ntable; i++) {
+        mrp_free((char *)dc->tables[i].table);
+        mrp_free((char *)dc->tables[i].mql_columns);
+        mrp_free((char *)dc->tables[i].mql_index);
+    }
+    mrp_free(dc->tables);
+
+    for (i = 0; i < dc->nwatch; i++) {
+        mrp_free((char *)dc->watches[i].table);
+        mrp_free((char *)dc->watches[i].mql_columns);
+        mrp_free((char *)dc->watches[i].mql_where);
+    }
+    mrp_free(dc->watches);
+
+    mrp_free(dc);
+}
+
+
+static inline void check_destroyed(mrp_domctl_t *dc)
+{
+    if (dc->destroyed && dc->busy <= 0) {
+        destroy_domctl(dc);
+    }
+}
+
+
+void mrp_domctl_destroy(mrp_domctl_t *dc)
+{
+    if (dc != NULL) {
+        mrp_domctl_disconnect(dc);
+
+        if (dc->busy <= 0)
+            destroy_domctl(dc);
+        else
+            dc->destroyed = TRUE;
+    }
+}
+
+
+static void notify_disconnect(mrp_domctl_t *dc, uint32_t errcode,
+                              const char *errmsg)
+{
+    DOMCTL_MARK_BUSY(dc, {
+            dc->connected = FALSE;
+            dc->connect_cb(dc, FALSE, errcode, errmsg, dc->user_data);
+        });
+}
+
+
+static void notify_connect(mrp_domctl_t *dc)
+{
+    DOMCTL_MARK_BUSY(dc, {
+            dc->connected = TRUE;
+            dc->connect_cb(dc, TRUE, 0, NULL, dc->user_data);
+        });
+}
+
+
+static int domctl_register(mrp_domctl_t *dc)
+{
+    mrp_msg_t *msg;
+    int        success;
+
+    msg = create_register_message(dc);
+
+    if (msg != NULL) {
+        success = mrp_transport_send(dc->t, msg);
+        mrp_msg_unref(msg);
+    }
+    else
+        success = FALSE;
+
+    return success;
+}
+
+
+int mrp_domctl_connect(mrp_domctl_t *dc, const char *address)
+{
+    static mrp_transport_evt_t evt = {
+        .closed      = closed_cb,
+        .recvmsg     = recv_cb,
+        .recvmsgfrom = recvfrom_cb,
+    };
+
+    mrp_sockaddr_t  addr;
+    socklen_t       addrlen;
+    const char     *type;
+
+    if (dc == NULL)
+        return FALSE;
+
+    addrlen = mrp_transport_resolve(NULL, address, &addr, sizeof(addr), &type);
+
+    if (addrlen > 0) {
+        dc->t = mrp_transport_create(dc->ml, type, &evt, dc, 0);
+
+        if (dc->t != NULL) {
+            if (mrp_transport_connect(dc->t, &addr, addrlen))
+                if (domctl_register(dc))
+                    return TRUE;
+
+            mrp_transport_destroy(dc->t);
+            dc->t = NULL;
+        }
+    }
+
+    return FALSE;
+}
+
+
+void mrp_domctl_disconnect(mrp_domctl_t *dc)
+{
+    if (dc->t != NULL) {
+        mrp_transport_destroy(dc->t);
+        dc->t         = NULL;
+        dc->connected = FALSE;
+    }
+}
+
+
+int mrp_domctl_set_data(mrp_domctl_t *dc, mrp_domctl_data_t *tables, int ntable,
+                     mrp_domctl_status_cb_t cb, void *user_data)
+{
+    mrp_msg_t *msg;
+    uint32_t   seq = dc->seqno++;
+    int        success, i;
+
+    if (!dc->connected)
+        return FALSE;
+
+    for (i = 0; i < ntable; i++) {
+        if (tables[i].id < 0 || tables[i].id >= dc->ntable)
+            return FALSE;
+    }
+
+    msg = create_set_message(seq, tables, ntable);
+
+    if (msg != NULL) {
+        /*
+          mrp_log_info("set data message message:");
+          mrp_msg_dump(msg, stdout);
+        */
+
+        success = mrp_transport_send(dc->t, msg);
+        mrp_msg_unref(msg);
+
+        if (success)
+            queue_pending(dc, seq, cb, user_data);
+
+        return success;
+    }
+    else
+        return FALSE;
+}
+
+
+static void process_ack(mrp_domctl_t *dc, uint32_t seq)
+{
+    if (seq != 0)
+        notify_pending(dc, seq, 0, NULL);
+    else
+        notify_connect(dc);
+}
+
+
+static void process_nak(mrp_domctl_t *dc, uint32_t seq, int32_t err,
+                        const char *msg)
+{
+    if (seq != 0)
+        notify_pending(dc, seq, err, msg);
+    else
+        notify_disconnect(dc, err, msg);
+}
+
+
+static void process_notify(mrp_domctl_t *dc, mrp_msg_t *msg, uint32_t seq)
+{
+    mrp_domctl_data_t  *data, *d;
+    mrp_domctl_value_t *values, *v;
+    void            *it;
+    uint16_t         ntable, ntotal, nrow, ncol;
+    uint16_t         tblid;
+    int              t, r, c;
+    uint16_t         type;
+    mrp_msg_value_t  value;
+
+    if (!mrp_msg_get(msg,
+                     MRP_PEPMSG_UINT16(NCHANGE, &ntable),
+                     MRP_PEPMSG_UINT16(NTOTAL , &ntotal),
+                     MRP_MSG_END))
+        return;
+
+    data   = alloca(sizeof(*data)   * ntable);
+    values = alloca(sizeof(*values) * ntotal);
+
+    it     = NULL;
+    d      = data;
+    v      = values;
+
+    for (t = 0; t < ntable; t++) {
+        if (!mrp_msg_iterate_get(msg, &it,
+                                 MRP_PEPMSG_UINT16(TBLID, &tblid),
+                                 MRP_PEPMSG_UINT16(NROW , &nrow ),
+                                 MRP_PEPMSG_UINT16(NCOL , &ncol ),
+                                 MRP_MSG_END))
+            return;
+
+        if (tblid >= dc->nwatch)
+            return;
+
+        d->id      = tblid;
+        d->ncolumn = ncol;
+        d->nrow    = nrow;
+        d->rows    = alloca(sizeof(*d->rows) * nrow);
+
+        for (r = 0; r < nrow; r++) {
+            d->rows[r] = v;
+
+            for (c = 0; c < ncol; c++) {
+                if (!mrp_msg_iterate_get(msg, &it,
+                                         MRP_PEPMSG_ANY(DATA, &type, &value),
+                                         MRP_MSG_END))
+                    return;
+
+                switch (type) {
+                case MRP_MSG_FIELD_STRING:
+                    v->type = MRP_DOMCTL_STRING;
+                    v->str  = value.str;
+                    break;
+                case MRP_MSG_FIELD_SINT32:
+                    v->type = MRP_DOMCTL_INTEGER;
+                    v->s32  = value.s32;
+                    break;
+                case MRP_MSG_FIELD_UINT32:
+                    v->type = MRP_DOMCTL_UNSIGNED;
+                    v->u32  = value.u32;
+                    break;
+                case MRP_MSG_FIELD_DOUBLE:
+                    v->type = MRP_DOMCTL_DOUBLE;
+                    v->dbl  = value.dbl;
+                    break;
+                default:
+                    return;
+                }
+
+                v++;
+            }
+        }
+
+        d++;
+    }
+
+    dc->watch_cb(dc, data, ntable, dc->user_data);
+}
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+    mrp_domctl_t  *dc = (mrp_domctl_t *)user_data;
+    uint16_t       type, nchange, ntotal;
+    uint32_t       seq;
+    int            ntable, ncolumn;
+    int32_t        error;
+    const char    *errmsg;
+
+    MRP_UNUSED(t);
+
+    /*
+      mrp_log_info("Received message:");
+      mrp_msg_dump(msg, stdout);
+    */
+
+    if (!mrp_msg_get(msg,
+                     MRP_PEPMSG_UINT16(MSGTYPE, &type),
+                     MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
+                     MRP_MSG_END)) {
+        mrp_domctl_disconnect(dc);
+        notify_disconnect(dc, EINVAL, "malformed message from client");
+        return;
+    }
+
+    switch (type) {
+    case MRP_PEPMSG_ACK:
+        process_ack(dc, seq);
+        break;
+
+    case MRP_PEPMSG_NAK:
+        error  = EINVAL;
+        errmsg = "request failed, unknown error";
+
+        mrp_msg_get(msg,
+                    MRP_PEPMSG_SINT32(ERRCODE, &error),
+                    MRP_PEPMSG_STRING(ERRMSG , &errmsg),
+                    MRP_MSG_END);
+
+        process_nak(dc, seq, error, errmsg);
+        break;
+
+    case MRP_PEPMSG_NOTIFY:
+        if (mrp_msg_get(msg,
+                        MRP_PEPMSG_UINT16(NCHANGE, &nchange),
+                        MRP_PEPMSG_UINT16(NTOTAL , &ntotal),
+                        MRP_MSG_END)) {
+            ntable  = nchange;
+            ncolumn = ntotal;
+
+            process_notify(dc, msg, seq);
+        }
+        break;
+
+    default:
+        break;
+    }
+
+}
+
+
+static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
+                        mrp_sockaddr_t *addr, socklen_t addrlen,
+                        void *user_data)
+{
+    MRP_UNUSED(addr);
+    MRP_UNUSED(addrlen);
+
+    /* XXX TODO:
+     *    This should neither be called nor be necessary to specify.
+     *    However, currently the transport layer mandates having to
+     *    give both recv and recvfrom event callbacks if no connection
+     *    event callback is given. However this is not correct because
+     *    on a client side one wants to be able to create a connection-
+     *    oriented transport without both connection and recvfrom event
+     *    callbacks. This needs to be fixed in transport by moving the
+     *    appropriate callback checks lower in the stack to the actual
+     *    transport backends.
+     */
+
+    mrp_log_error("Whoa... recvfrom called for a connected transport.");
+    exit(1);
+}
+
+
+static void closed_cb(mrp_transport_t *t, int error, void *user_data)
+{
+    mrp_domctl_t *dc = (mrp_domctl_t *)user_data;
+
+    MRP_UNUSED(t);
+    MRP_UNUSED(dc);
+
+    if (error)
+        notify_disconnect(dc, error, strerror(error));
+    else
+        notify_disconnect(dc, ECONNRESET, "server has closed the connection");
+}
+
+
+static int queue_pending(mrp_domctl_t *dc, uint32_t seq,
+                         mrp_domctl_status_cb_t cb, void *user_data)
+{
+    pending_request_t *pending;
+
+    pending = mrp_allocz(sizeof(*pending));
+
+    if (pending != NULL) {
+        mrp_list_init(&pending->hook);
+
+        pending->seqno     = seq;
+        pending->cb        = cb;
+        pending->user_data = user_data;
+
+        mrp_list_append(&dc->pending, &pending->hook);
+
+        return TRUE;
+    }
+    else
+        return FALSE;
+}
+
+
+static int notify_pending(mrp_domctl_t *dc, uint32_t seq, int error,
+                          const char *msg)
+{
+    mrp_list_hook_t   *p, *n;
+    pending_request_t *pending;
+
+    mrp_list_foreach(&dc->pending, p, n) {
+        pending = mrp_list_entry(p, typeof(*pending), hook);
+
+        if (pending->seqno == seq) {
+            DOMCTL_MARK_BUSY(dc, {
+                    pending->cb(dc, error, msg, pending->user_data);
+                    mrp_list_delete(&pending->hook);
+                    mrp_free(pending);
+                });
+
+            return TRUE;
+        }
+    }
+
+    return FALSE;
+}
+
+
+static void purge_pending(mrp_domctl_t *dc)
+{
+    mrp_list_hook_t   *p, *n;
+    pending_request_t *pending;
+
+    mrp_list_foreach(&dc->pending, p, n) {
+        pending = mrp_list_entry(p, typeof(*pending), hook);
+
+        mrp_list_delete(&pending->hook);
+        mrp_free(pending);
+    }
+}
diff --git a/src/plugins/domain-control/client.h b/src/plugins/domain-control/client.h
new file mode 100644 (file)
index 0000000..91bbd27
--- /dev/null
@@ -0,0 +1,117 @@
+#ifndef __MURPHY_DOMAIN_CONTROL_CLIENT_H__
+#define __MURPHY_DOMAIN_CONTROL_CLIENT_H__
+
+#include <murphy-db/mqi.h>
+#include <murphy/common/mainloop.h>
+
+#define MRP_DEFAULT_DOMCTL_ADDRESS "unxs:@murphy-domctrl"
+
+
+/*
+ * a table owned by a domain controller
+ */
+
+typedef struct {
+    const char *table;                   /* table name */
+    const char *mql_columns;             /* column definition scriptlet */
+    const char *mql_index;               /* index column list */
+} mrp_domctl_table_t;
+
+#define MRP_DOMCTL_TABLE(_table, _columns, _index) \
+    { .table = _table, .mql_columns = _columns, .mql_index = _index }
+
+
+/*
+ * a table tracked by a domain controller
+ */
+
+typedef struct {
+    const char *table;                   /* table name */
+    const char *mql_columns;             /* column list for select */
+    const char *mql_where;               /* where clause for select */
+    int         max_rows;                /* max number of rows to select */
+} mrp_domctl_watch_t;
+
+#define MRP_DOMCTL_WATCH(_table, _columns, _where, _max_rows) {       \
+        .table       = _table                  ,                      \
+        .mql_columns = _columns ? _columns : "",                      \
+        .mql_where   = _where   ? _where   : "",                      \
+        .max_rows    = _max_rows               ,                      \
+    }
+
+
+/*
+ * table column types and values
+ */
+
+typedef enum {
+    MRP_DOMCTL_STRING   = mqi_varchar,
+    MRP_DOMCTL_INTEGER  = mqi_integer,
+    MRP_DOMCTL_UNSIGNED = mqi_unsignd,
+    MRP_DOMCTL_DOUBLE   = mqi_floating
+} mrp_domctl_type_t;
+
+typedef struct {
+    mrp_domctl_type_t  type;             /* data type */
+    union {
+        const char *str;                 /* MRP_DOMCTL_STRING */
+        uint32_t    u32;                 /* MRP_DOMCTL_UNSIGNED */
+        int32_t     s32;                 /* MRP_DOMCTL_INTEGER */
+        double      dbl;                 /* MRP_DOMCTL_DOUBLE */
+    };
+} mrp_domctl_value_t;
+
+
+/*
+ * table data
+ */
+
+typedef struct {
+    int                  id;             /* table id */
+    mqi_column_def_t    *coldefs;        /* column definitions */
+    int                  ncolumn;        /* columns per row */
+    mrp_domctl_value_t **rows;           /* row data */
+    int                  nrow;           /* number of rows */
+} mrp_domctl_data_t;
+
+
+
+/** Opaque policy domain controller type. */
+typedef struct mrp_domctl_s mrp_domctl_t;
+
+/** Callback type for connection state notifications. */
+typedef void (*mrp_domctl_connect_cb_t)(mrp_domctl_t *dc, int connection,
+                                        int errcode, const char *errmsg,
+                                        void *user_data);
+
+/** Callback type for request status notifications. */
+typedef void (*mrp_domctl_status_cb_t)(mrp_domctl_t *dc, int errcode,
+                                       const char *errmsg, void *user_data);
+
+/** Callback type for data change notifications. */
+typedef void (*mrp_domctl_watch_cb_t)(mrp_domctl_t *dc,
+                                      mrp_domctl_data_t *tables, int ntable,
+                                      void *user_data);
+
+/** Create a new policy domain controller. */
+mrp_domctl_t *mrp_domctl_create(const char *name, mrp_mainloop_t *ml,
+                                mrp_domctl_table_t *tables, int ntable,
+                                mrp_domctl_watch_t *watches, int nwatch,
+                                mrp_domctl_connect_cb_t connect_cb,
+                                mrp_domctl_watch_cb_t watch_cb,
+                                void *user_data);
+
+/** Destroy the given policy domain controller. */
+void mrp_domctl_destroy(mrp_domctl_t *dc);
+
+/** Connect and register the given controller to the server. */
+int mrp_domctl_connect(mrp_domctl_t *dc, const char *address);
+
+/** Close the connection to the server. */
+void mrp_domctl_disconnect(mrp_domctl_t *dc);
+
+/** Set the content of the given tables to the provided data. */
+int mrp_domctl_set_data(mrp_domctl_t *dc, mrp_domctl_data_t *tables, int ntable,
+                        mrp_domctl_status_cb_t status_cb, void *user_data);
+
+#endif /* __MURPHY_DOMAIN_CONTROL_CLIENT_H__ */
diff --git a/src/plugins/domain-control/domain-control-types.h b/src/plugins/domain-control/domain-control-types.h
new file mode 100644 (file)
index 0000000..c5be022
--- /dev/null
@@ -0,0 +1,115 @@
+#ifndef __MURPHY_DOMAIN_CONTROL_TYPES_H__
+#define __MURPHY_DOMAIN_CONTROL_TYPES_H__
+
+#include <murphy/common/list.h>
+#include <murphy/common/mainloop.h>
+#include <murphy/common/transport.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/core/context.h>
+
+#include "client.h"
+
+typedef struct pep_proxy_s pep_proxy_t;
+typedef struct pep_table_s pep_table_t;
+typedef struct pep_watch_s pep_watch_t;
+typedef struct pdp_s       pdp_t;
+
+
+/*
+ * a domain controller (on the client side)
+ */
+
+struct mrp_domctl_s {
+    char                    *name;       /* enforcment point name */
+    mrp_mainloop_t          *ml;         /* main loop */
+    mrp_transport_t         *t;          /* transport towards murphy */
+    int                      connected;  /* transport is up */
+    mrp_domctl_table_t      *tables;     /* owned tables */
+    int                      ntable;     /* number of owned tables */
+    mrp_domctl_watch_t      *watches;    /* watched tables */
+    int                      nwatch;     /* number of watched tables */
+    mrp_domctl_connect_cb_t  connect_cb; /* connection state change callback */
+    mrp_domctl_watch_cb_t    watch_cb;   /* watched table change callback */
+    void                    *user_data;  /* opqaue user data for callbacks */
+    int                      busy;       /* non-zero if a callback is active */
+    int                      destroyed:1;/* non-zero if destroy pending */
+    uint32_t                 seqno;      /* request sequence number */
+    mrp_list_hook_t          pending;    /* queue of outstanding requests */
+};
+
+
+/*
+ * a table associated with or tracked by an enforcement point
+ */
+
+struct pep_table_s {
+    char               *name;            /* table name */
+    char               *mql_columns;     /* column definition clause */
+    char               *mql_index;       /* index column list */
+    mrp_list_hook_t     hook;            /* to list of tables */
+    mqi_handle_t        h;               /* table handle */
+    mqi_column_def_t   *columns;         /* column definitions */
+    mqi_column_desc_t  *coldesc;         /* column descriptors */
+    int                 ncolumn;         /* number of columns */
+    int                 idx_col;         /* column index of index column */
+    mrp_list_hook_t     watches;         /* watches for this table */
+    int                 notify_all : 1;  /* notify all watches */
+};
+
+
+/*
+ * a table watch
+ */
+
+struct pep_watch_s {
+    pep_table_t     *table;              /* table being watched */
+    char            *mql_columns;        /* column list to select */
+    char            *mql_where;          /* where clause for select */
+    int              max_rows;           /* max number of rows to select */
+    pep_proxy_t     *proxy;              /* enforcement point */
+    int              id;                 /* table id within proxy */
+    uint32_t         stamp;              /* last notified update stamp */
+    mrp_list_hook_t  tbl_hook;           /* hook to table watch list */
+    mrp_list_hook_t  pep_hook;           /* hook to proxy watch list */
+};
+
+
+/*
+ * a policy enforcement point (on the server side)
+ */
+
+struct pep_proxy_s {
+    char              *name;             /* enforcement point name */
+    pdp_t             *pdp;              /* domain controller context */
+    mrp_transport_t   *t;                /* associated transport */
+    mrp_list_hook_t    hook;             /* to list of all enforcement points */
+    pep_table_t       *tables;           /* tables owned by this */
+    int                ntable;           /* number of tables */
+    mrp_list_hook_t    watches;          /* tables watched by this */
+    int                notify_update;    /* whether needs notification */
+    mrp_msg_t         *notify_msg;       /* notification being built */
+    int                notify_ntable;    /* number of changed tables */
+    int                notify_ncolumn;   /* total columns in notification */
+    int                notify_fail : 1;  /* notification failure */
+    int                notify_all : 1;   /* notify all watches */
+};
+
+
+/*
+ * policy domain controller context
+ */
+
+struct pdp_s {
+    mrp_context_t   *ctx;                /* murphy context */
+    const char      *address;            /* external transport address */
+    mrp_transport_t *ext;                /* external transport */
+    mrp_list_hook_t  proxies;            /* list of enforcement points */
+    mrp_list_hook_t  tables;             /* list of tables we track */
+    mrp_htbl_t      *watched;            /* tracked tables by name */
+    mrp_deferred_t  *notify;             /* deferred notification */
+    int              notify_scheduled;   /* is notification scheduled? */
+};
+
+
+
+#endif /* __MURPHY_DOMAIN_CONTROL_TYPES_H__ */
diff --git a/src/plugins/domain-control/domain-control.c b/src/plugins/domain-control/domain-control.c
new file mode 100644 (file)
index 0000000..5459105
--- /dev/null
@@ -0,0 +1,386 @@
+#include <errno.h>
+
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+
+#include "proxy.h"
+#include "table.h"
+#include "message.h"
+#include "notify.h"
+#include "domain-control.h"
+
+static int create_transports(pdp_t *pdp);
+static void destroy_transports(pdp_t *pdp);
+
+pdp_t *create_domain_control(mrp_context_t *ctx, const char *address)
+{
+    pdp_t *pdp;
+
+    pdp = mrp_allocz(sizeof(*pdp));
+
+    if (pdp != NULL) {
+        pdp->ctx     = ctx;
+        pdp->address = address;
+
+        if (init_proxies(pdp) && init_tables(pdp) && create_transports(pdp))
+            return pdp;
+        else
+            destroy_domain_control(pdp);
+    }
+
+    return NULL;
+}
+
+
+void destroy_domain_control(pdp_t *pdp)
+{
+    if (pdp != NULL) {
+        destroy_proxies(pdp);
+        destroy_tables(pdp);
+        destroy_transports(pdp);
+
+        mrp_free(pdp);
+    }
+}
+
+
+static void notify_cb(mrp_mainloop_t *ml, mrp_deferred_t *d, void *user_data)
+{
+    pdp_t *pdp = (pdp_t *)user_data;
+
+    MRP_UNUSED(ml);
+
+    mrp_disable_deferred(d);
+    pdp->notify_scheduled = FALSE;
+    notify_table_changes(pdp);
+}
+
+
+void schedule_notification(pdp_t *pdp)
+{
+
+    if (pdp->notify == NULL)
+        pdp->notify = mrp_add_deferred(pdp->ctx->ml, notify_cb, pdp);
+
+    if (!pdp->notify_scheduled) {
+        mrp_debug("scheduling client notification");
+        mrp_enable_deferred(pdp->notify);
+    }
+}
+
+
+static void send_ack_reply(mrp_transport_t *t, uint32_t seq)
+{
+    mrp_msg_t *msg;
+
+    msg = create_ack_message(seq);
+
+    if (msg != NULL) {
+        mrp_transport_send(t, msg);
+        mrp_msg_unref(msg);
+    }
+}
+
+
+static void send_nak_reply(mrp_transport_t *t, uint32_t seq, int error,
+                           const char *errmsg)
+{
+    mrp_msg_t *msg;
+
+    msg = create_nak_message(seq, error, errmsg);
+
+    if (msg != NULL) {
+        mrp_transport_send(t, msg);
+        mrp_msg_unref(msg);
+    }
+}
+
+
+static int process_register_request(pep_proxy_t *proxy, mrp_msg_t *req,
+                                    uint32_t seq)
+{
+    mrp_transport_t    *t = proxy->t;
+    char               *name;
+    mrp_domctl_table_t *tables;
+    mrp_domctl_watch_t *watches;
+    uint16_t            utable, uwatch;
+    int                 ntable, nwatch;
+    int                 error;
+    const char         *errmsg;
+
+    if (mrp_msg_get(req,
+                    MRP_PEPMSG_STRING(NAME   , &name  ),
+                    MRP_PEPMSG_UINT16(NTABLE , &utable),
+                    MRP_PEPMSG_UINT16(NWATCH , &uwatch),
+                    MRP_MSG_END)) {
+        ntable  = utable;
+        nwatch  = uwatch;
+        tables  = alloca(ntable * sizeof(*tables));
+        watches = alloca(nwatch * sizeof(*watches));
+
+        if (decode_register_message(req, tables, ntable, watches, nwatch)) {
+            if (register_proxy(proxy, name, tables, ntable, watches, nwatch,
+                               &error, &errmsg)) {
+                send_ack_reply(t, seq);
+                proxy->notify_all = TRUE;
+                schedule_notification(proxy->pdp);
+
+                return TRUE;
+            }
+        }
+        else
+            goto malformed;
+    }
+    else {
+    malformed:
+        error  = EINVAL;
+        errmsg = "malformed register message";
+    }
+
+    send_nak_reply(t, seq, error, errmsg);
+
+    return FALSE;
+}
+
+
+static void process_unregister_request(pep_proxy_t *proxy, uint32_t seq)
+{
+    send_ack_reply(proxy->t, seq);
+    unregister_proxy(proxy);
+}
+
+
+static void process_set_request(pep_proxy_t *proxy, mrp_msg_t *msg,
+                                uint32_t seq)
+{
+    mrp_domctl_data_t  *data, *d;
+    mrp_domctl_value_t *values, *v;
+    void               *it;
+    uint16_t            ntable, ntotal, nrow, ncol;
+    uint16_t            tblid;
+    int                 t, r, c;
+    uint16_t            type;
+    mrp_msg_value_t     value;
+    int                 error;
+    const char         *errmsg;
+
+    if (!mrp_msg_get(msg,
+                     MRP_PEPMSG_UINT16(NCHANGE, &ntable),
+                     MRP_PEPMSG_UINT16(NTOTAL , &ntotal),
+                     MRP_MSG_END))
+        return;
+
+    data   = alloca(sizeof(*data)   * ntable);
+    values = alloca(sizeof(*values) * ntotal);
+
+    it     = NULL;
+    d      = data;
+    v      = values;
+
+    for (t = 0; t < ntable; t++) {
+        if (!mrp_msg_iterate_get(msg, &it,
+                                 MRP_PEPMSG_UINT16(TBLID, &tblid),
+                                 MRP_PEPMSG_UINT16(NROW , &nrow ),
+                                 MRP_PEPMSG_UINT16(NCOL , &ncol ),
+                                 MRP_MSG_END))
+            goto reply_nak;
+
+        if (tblid >= proxy->ntable)
+            goto reply_nak;
+
+        d->id      = tblid;
+        d->ncolumn = ncol;
+        d->nrow    = nrow;
+        d->rows    = alloca(sizeof(*d->rows) * nrow);
+
+        for (r = 0; r < nrow; r++) {
+            d->rows[r] = v;
+
+            for (c = 0; c < ncol; c++) {
+                if (!mrp_msg_iterate_get(msg, &it,
+                                         MRP_PEPMSG_ANY(DATA, &type, &value),
+                                         MRP_MSG_END))
+                    goto reply_nak;
+
+                switch (type) {
+                case MRP_MSG_FIELD_STRING:
+                    v->type = MRP_DOMCTL_STRING;
+                    v->str  = value.str;
+                    break;
+                case MRP_MSG_FIELD_SINT32:
+                    v->type = MRP_DOMCTL_INTEGER;
+                    v->s32  = value.s32;
+                    break;
+                case MRP_MSG_FIELD_UINT32:
+                    v->type = MRP_DOMCTL_UNSIGNED;
+                    v->u32  = value.u32;
+                    break;
+                case MRP_MSG_FIELD_DOUBLE:
+                    v->type = MRP_DOMCTL_DOUBLE;
+                    v->dbl  = value.dbl;
+                    break;
+                default:
+                    goto reply_nak;
+                }
+
+                v++;
+            }
+        }
+
+        d++;
+    }
+
+    if (set_proxy_tables(proxy, data, ntable, &error, &errmsg)) {
+        send_ack_reply(proxy->t, seq);
+    }
+    else {
+    reply_nak:
+        send_nak_reply(proxy->t, seq, error, errmsg);
+    }
+}
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+    pep_proxy_t *proxy = (pep_proxy_t *)user_data;
+    char        *name  = proxy && proxy->name ? proxy->name : "<unknown>";
+    uint16_t     type;
+    uint32_t     seq;
+
+    /*
+      mrp_log_info("Message from client %p:", proxy);
+      mrp_msg_dump(msg, stdout);
+    */
+
+    if (!mrp_msg_get(msg,
+                     MRP_PEPMSG_UINT16(MSGTYPE, &type),
+                     MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
+                     MRP_MSG_END)) {
+        mrp_log_error("Malformed message from client %s.", name);
+        send_nak_reply(t, 0, EINVAL, "malformed message");
+    }
+    else {
+        switch (type) {
+        case MRP_PEPMSG_REGISTER:
+            if (!process_register_request(proxy, msg, seq))
+                destroy_proxy(proxy);
+            break;
+
+        case MRP_PEPMSG_UNREGISTER:
+            process_unregister_request(proxy, seq);
+            break;
+
+        case MRP_PEPMSG_SET:
+            process_set_request(proxy, msg, seq);
+            break;
+
+        default:
+            break;
+        }
+    }
+}
+
+
+static void connect_cb(mrp_transport_t *ext, void *user_data)
+{
+    pdp_t       *pdp = (pdp_t *)user_data;
+    pep_proxy_t *proxy;
+    int          flags;
+
+    proxy = create_proxy(pdp);
+
+    if (proxy != NULL) {
+        flags    = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
+        proxy->t = mrp_transport_accept(ext, proxy, flags);
+
+        if (proxy->t != NULL)
+            mrp_log_info("Accepted new client connection.");
+        else {
+            mrp_log_error("Failed to accept new client connection.");
+            destroy_proxy(proxy);
+        }
+    }
+}
+
+
+static void closed_cb(mrp_transport_t *t, int error, void *user_data)
+{
+    pep_proxy_t *proxy = (pep_proxy_t *)user_data;
+    char        *name  = proxy && proxy->name ? proxy->name : "<unknown>";
+
+    MRP_UNUSED(t);
+
+    if (error)
+        mrp_log_error("Transport to client %s closed (%d: %s).",
+                      name, error, strerror(error));
+    else
+        mrp_log_info("Transport to client %s closed.", name);
+
+    mrp_log_info("Destroying client %s.", name);
+    destroy_proxy(proxy);
+}
+
+
+static int create_ext_transport(pdp_t *pdp)
+{
+    static mrp_transport_evt_t evt = {
+        .closed      = closed_cb,
+        .recvmsg     = recv_cb,
+        .recvmsgfrom = NULL,
+        .connection  = connect_cb,
+    };
+
+    mrp_transport_t *t;
+    mrp_sockaddr_t   addr;
+    socklen_t        addrlen;
+    int              flags;
+    const char      *type;
+
+    t       = NULL;
+    addrlen = mrp_transport_resolve(NULL, pdp->address,
+                                    &addr, sizeof(addr), &type);
+
+    if (addrlen > 0) {
+        flags = MRP_TRANSPORT_REUSEADDR;
+        t     = mrp_transport_create(pdp->ctx->ml, type, &evt, pdp, flags);
+
+        if (t != NULL) {
+            if (mrp_transport_bind(t, &addr, addrlen) &&
+                mrp_transport_listen(t, 4)) {
+                mrp_log_info("Listening on transport %s...", pdp->address);
+                pdp->ext = t;
+
+                return TRUE;
+            }
+            else
+                mrp_log_error("Failed to bind transport to %s.", pdp->address);
+        }
+        else
+            mrp_log_error("Failed to create transport for %s.", pdp->address);
+    }
+    else
+        mrp_log_error("Invalid transport address %s.", pdp->address);
+
+    return FALSE;
+}
+
+
+static void destroy_ext_transport(pdp_t *pdp)
+{
+    if (pdp != NULL) {
+        mrp_transport_destroy(pdp->ext);
+        pdp->ext = NULL;
+    }
+}
+
+
+static int create_transports(pdp_t *pdp)
+{
+    return create_ext_transport(pdp);
+}
+
+
+static void destroy_transports(pdp_t *pdp)
+{
+    destroy_ext_transport(pdp);
+}
diff --git a/src/plugins/domain-control/domain-control.h b/src/plugins/domain-control/domain-control.h
new file mode 100644 (file)
index 0000000..27641ce
--- /dev/null
@@ -0,0 +1,11 @@
+#ifndef __MURPHY_DOMAIN_CONTROL_H__
+#define __MURPHY_DOMAIN_CONTROL_H__
+
+#include "domain-control-types.h"
+
+pdp_t *create_domain_control(mrp_context_t *ctx, const char *address);
+void destroy_domain_control(pdp_t *pdp);
+
+void schedule_notification(pdp_t *pdp);
+
+#endif /* __MURPHY_DOMAIN_CONTROL_H__ */
diff --git a/src/plugins/domain-control/message.c b/src/plugins/domain-control/message.c
new file mode 100644 (file)
index 0000000..d26e0f0
--- /dev/null
@@ -0,0 +1,235 @@
+#include "message.h"
+
+static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col,
+                          int ncolumn, mrp_domctl_value_t *data);
+
+mrp_msg_t *create_register_message(mrp_domctl_t *dc)
+{
+    mrp_msg_t          *msg;
+    mrp_domctl_table_t *t;
+    mrp_domctl_watch_t *w;
+    int                 i;
+
+    msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_REGISTER),
+                         MRP_PEPMSG_UINT32(MSGSEQ , 0),
+                         MRP_PEPMSG_STRING(NAME   , dc->name),
+                         MRP_PEPMSG_UINT16(NTABLE , dc->ntable),
+                         MRP_PEPMSG_UINT16(NWATCH , dc->nwatch),
+                         MRP_MSG_END);
+
+    for (i = 0, t = dc->tables; i < dc->ntable; i++, t++) {
+        mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, t->table));
+        mrp_msg_append(msg, MRP_PEPMSG_STRING(COLUMNS, t->mql_columns));
+        mrp_msg_append(msg, MRP_PEPMSG_STRING(INDEX  , t->mql_index));
+    }
+
+    for (i = 0, w = dc->watches; i < dc->nwatch; i++, w++) {
+        mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, w->table));
+        mrp_msg_append(msg, MRP_PEPMSG_STRING(COLUMNS, w->mql_columns));
+        mrp_msg_append(msg, MRP_PEPMSG_STRING(WHERE  , w->mql_where));
+        mrp_msg_append(msg, MRP_PEPMSG_UINT16(MAXROWS, w->max_rows));
+    }
+
+
+    return msg;
+}
+
+
+int decode_register_message(mrp_msg_t *msg,
+                            mrp_domctl_table_t *tables, int ntable,
+                            mrp_domctl_watch_t *watches, int nwatch)
+{
+    mrp_domctl_table_t *t;
+    mrp_domctl_watch_t *w;
+    void               *it;
+    char               *table, *columns, *index, *where;
+    uint16_t            ntbl, nwch, max_rows;
+    int                 i;
+
+    it = NULL;
+
+    if (!mrp_msg_iterate_get(msg, &it,
+                             MRP_PEPMSG_UINT16(NTABLE , &ntbl),
+                             MRP_PEPMSG_UINT16(NWATCH , &nwch),
+                             MRP_MSG_END))
+        return FALSE;
+
+    if (ntbl > ntable || nwch > nwatch)
+        return FALSE;
+
+    for (i = 0, t = tables; i < ntable; i++, t++) {
+        if (mrp_msg_iterate_get(msg, &it,
+                                MRP_PEPMSG_STRING(TBLNAME, &table),
+                                MRP_PEPMSG_STRING(COLUMNS, &columns),
+                                MRP_PEPMSG_STRING(INDEX  , &index),
+                                MRP_MSG_END)) {
+            t->table       = table;
+            t->mql_columns = columns;
+            t->mql_index   = index;
+        }
+        else
+            return FALSE;
+    }
+
+    for (i = 0, w = watches; i < nwatch; i++, w++) {
+        if (mrp_msg_iterate_get(msg, &it,
+                                MRP_PEPMSG_STRING(TBLNAME, &table),
+                                MRP_PEPMSG_STRING(COLUMNS, &columns),
+                                MRP_PEPMSG_STRING(WHERE  , &where),
+                                MRP_PEPMSG_UINT16(MAXROWS, &max_rows),
+                                MRP_MSG_END)) {
+            w->table       = table;
+            w->mql_columns = columns;
+            w->mql_where   = where;
+            w->max_rows    = max_rows;
+        }
+        else
+            return FALSE;
+    }
+
+    return TRUE;
+}
+
+
+mrp_msg_t *create_ack_message(uint32_t seq)
+{
+    return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_ACK),
+                          MRP_PEPMSG_UINT32(MSGSEQ , seq),
+                          MRP_MSG_END);
+}
+
+
+mrp_msg_t *create_nak_message(uint32_t seq, int error, const char *errmsg)
+{
+    return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NAK),
+                          MRP_PEPMSG_UINT32(MSGSEQ , seq),
+                          MRP_PEPMSG_SINT32(ERRCODE, error),
+                          MRP_PEPMSG_STRING(ERRMSG , errmsg),
+                          MRP_MSG_END);
+}
+
+
+mrp_msg_t *create_notify_message(void)
+{
+    return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NOTIFY),
+                          MRP_PEPMSG_UINT32(MSGSEQ , 0),
+                          MRP_PEPMSG_UINT16(NCHANGE, 0),
+                          MRP_PEPMSG_UINT16(NTOTAL , 0),
+                          MRP_MSG_END);
+}
+
+
+int update_notify_message(mrp_msg_t *msg, int id, mqi_column_def_t *columns,
+                          int ncolumn, mrp_domctl_value_t *data, int nrow)
+{
+    mrp_domctl_value_t *v;
+    uint16_t           tid, nr;
+    int                i;
+
+    nr  = nrow;
+    tid = id;
+
+    if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) ||
+        !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nr )))
+        return FALSE;
+
+    for (i = 0, v = data; i < nrow; i++, v += ncolumn) {
+        if (!append_one_row(msg, MRP_PEPTAG_DATA, columns, ncolumn, v))
+            return FALSE;
+    }
+
+    return TRUE;
+}
+
+
+mrp_msg_t *create_set_message(uint32_t seq, mrp_domctl_data_t *tables,
+                              int ntable)
+{
+    mrp_msg_t          *msg;
+    mrp_domctl_value_t *rows, *col;
+    uint16_t            utable, utotal, tid, ncol, nrow;
+    int                 i, r, c;
+
+    utable = ntable;
+    utotal = 0;
+
+    msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_SET),
+                         MRP_PEPMSG_UINT32(MSGSEQ , seq),
+                         MRP_PEPMSG_UINT16(NCHANGE, utable),
+                         MRP_PEPMSG_UINT16(NTOTAL , 0),
+                         MRP_MSG_END);
+
+    if (msg != NULL) {
+        for (i = 0; i < ntable; i++) {
+            tid  = tables[i].id;
+            ncol = tables[i].ncolumn;
+            nrow = tables[i].nrow;
+
+            if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid))  ||
+                !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nrow)) ||
+                !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOL , ncol)))
+                goto fail;
+
+            for (r = 0; r < nrow; r++) {
+                rows = tables[i].rows[r];
+
+                for (c = 0; c < ncol; c++) {
+                    col = rows + c;
+#define HANDLE_TYPE(pt, t, m)                                                  \
+                    case MRP_DOMCTL_##pt:                                      \
+                        if (!mrp_msg_append(msg, MRP_PEPMSG_##t(DATA,col->m))) \
+                            goto fail;                                         \
+                        break;
+
+                    switch (col->type) {
+                        HANDLE_TYPE(STRING  , STRING, str);
+                        HANDLE_TYPE(INTEGER , SINT32, s32);
+                        HANDLE_TYPE(UNSIGNED, UINT32, u32);
+                        HANDLE_TYPE(DOUBLE  , DOUBLE, dbl);
+                    default:
+                        goto fail;
+                    }
+#undef HANDLE_TYPE
+                }
+            }
+            utotal += nrow * ncol;
+        }
+
+        mrp_msg_set(msg, MRP_PEPMSG_UINT16(NTOTAL, utotal));
+
+        return msg;
+    }
+
+ fail:
+    mrp_msg_unref(msg);
+    return NULL;
+}
+
+
+static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col,
+                          int ncolumn, mrp_domctl_value_t *data)
+{
+#define HANDLE_TYPE(dbtype, type, member)                                 \
+    case mqi_##dbtype:                                                    \
+        if (!mrp_msg_append(msg, MRP_MSG_TAG_##type(tag, data->member)))  \
+            return FALSE;                                                 \
+        break
+
+    int i;
+
+    for (i = 0; i < ncolumn; i++, data++, col++) {
+        switch (col->type) {
+            HANDLE_TYPE(integer , SINT32, s32);
+            HANDLE_TYPE(unsignd , UINT32, u32);
+            HANDLE_TYPE(floating, DOUBLE, dbl);
+            HANDLE_TYPE(string  , STRING, str);
+        case mqi_blob:
+        default:
+            return FALSE;
+        }
+    }
+
+    return TRUE;
+
+#undef HANDLE_TYPE
+}
diff --git a/src/plugins/domain-control/message.h b/src/plugins/domain-control/message.h
new file mode 100644 (file)
index 0000000..41f37f2
--- /dev/null
@@ -0,0 +1,100 @@
+#ifndef __MURPHY_DOMAIN_CONTROL_MESSAGE_H__
+#define __MURPHY_DOMAIN_CONTROL_MESSAGE_H__
+
+#include <murphy/common/msg.h>
+
+#include "domain-control-types.h"
+#include "client.h"
+
+
+#define MRP_PEPMSG_UINT16(tag, value) \
+    MRP_MSG_TAG_UINT16(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_SINT16(tag, value) \
+    MRP_MSG_TAG_SINT16(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_UINT32(tag, value) \
+    MRP_MSG_TAG_UINT32(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_SINT32(tag, value) \
+    MRP_MSG_TAG_SINT32(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_DOUBLE(tag, value) \
+    MRP_MSG_TAG_DOUBLE(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_STRING(tag, value) \
+    MRP_MSG_TAG_STRING(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_ANY(tag, typep, valuep) \
+    MRP_MSG_TAG_ANY(MRP_PEPTAG_##tag, typep, valuep)
+
+/*
+ * message types
+ */
+
+typedef enum {
+    MRP_PEPMSG_REGISTER   = 0x1,         /* client: register me */
+    MRP_PEPMSG_UNREGISTER = 0x2,         /* client: unregister me */
+    MRP_PEPMSG_SET        = 0x3,         /* client: set table data */
+    MRP_PEPMSG_NOTIFY     = 0x4,         /* server: table changes */
+    MRP_PEPMSG_ACK        = 0x5,         /* server: ok */
+    MRP_PEPMSG_NAK        = 0x6,         /* server: request failed */
+} mrp_pepmsg_type_t;
+
+
+/*
+ * message-specific tags
+ */
+
+typedef enum {
+    /*
+     * fixed common tags
+     */
+    MRP_PEPTAG_MSGTYPE = 0x1,            /* message type */
+    MRP_PEPTAG_MSGSEQ  = 0x2,            /* sequence number */
+
+    /*
+     * fixed tags in registration messages
+     */
+    MRP_PEPTAG_NAME     = 0x3,           /* enforcement point name */
+    MRP_PEPTAG_NTABLE   = 0x4,           /* number of owned tables */
+    MRP_PEPTAG_NWATCH   = 0x5,           /* number of watched tables */
+    MRP_PEPTAG_TBLNAME  = 0x6,           /* table name */
+    MRP_PEPTAG_COLUMNS  = 0x8,           /* column definitions/list */
+    MRP_PEPTAG_INDEX    = 0x9,           /* index definition */
+    MRP_PEPTAG_WHERE    = 0xa,           /* where clause for select */
+    MRP_PEPTAG_MAXROWS  = 0xb,           /* max number of rows to select */
+
+    /*
+     * fixed tags in NAKs
+     */
+    MRP_PEPTAG_ERRCODE  = 0x3,           /* error code */
+    MRP_PEPTAG_ERRMSG   = 0x4,           /* error message */
+
+    /*
+     * fixed tags in data notification messages
+     */
+    MRP_PEPTAG_NCHANGE = 0x3,            /* number of tables in notification */
+    MRP_PEPTAG_NTOTAL  = 0x4,            /* total columns in notification */
+    MRP_PEPTAG_TBLID   = 0x5,            /* table id */
+    MRP_PEPTAG_NROW    = 0x6,            /* number of table rows */
+    MRP_PEPTAG_NCOL    = 0x7,            /* number of columns in a row */
+    MRP_PEPTAG_DATA    = 0x8,            /* a data column */
+} mrp_pepmsg_tag_t;
+
+
+mrp_msg_t *create_register_message(mrp_domctl_t *dc);
+int decode_register_message(mrp_msg_t *msg,
+                            mrp_domctl_table_t *tables, int ntable,
+                            mrp_domctl_watch_t *watches, int nwatch);
+
+mrp_msg_t *create_ack_message(uint32_t seq);
+mrp_msg_t *create_nak_message(uint32_t seq, int error, const char *errmsg);
+mrp_msg_t *create_notify_message(void);
+int update_notify_message(mrp_msg_t *msg, int id, mqi_column_def_t *columns,
+                          int ncolumn, mrp_domctl_value_t *data, int nrow);
+
+mrp_msg_t *create_set_message(uint32_t seq, mrp_domctl_data_t *tables,
+                              int ntable);
+
+#endif /* __MURPHY_DOMAIN_CONTROL_MESSAGE_H__ */
diff --git a/src/plugins/domain-control/notify.c b/src/plugins/domain-control/notify.c
new file mode 100644 (file)
index 0000000..b9420e6
--- /dev/null
@@ -0,0 +1,219 @@
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+
+#include <murphy-db/mql-result.h>
+
+#include "domain-control-types.h"
+#include "message.h"
+#include "table.h"
+#include "notify.h"
+
+
+static void prepare_proxy_notification(pep_proxy_t *proxy)
+{
+    proxy->notify_update  = FALSE;
+    proxy->notify_ntable  = 0;
+    proxy->notify_ncolumn = 0;
+    proxy->notify_fail    = FALSE;
+}
+
+
+static void check_watch_notification(pep_watch_t *w)
+{
+    pep_proxy_t *proxy = w->proxy;
+    pep_table_t *t     = w->table;
+    int          update;
+
+    if (t->notify_all) {
+        t->h   = mqi_get_table_handle(t->name);
+        update = TRUE;
+    }
+    else {
+        if (t->h != MQI_HANDLE_INVALID)
+            update = (w->stamp < mqi_get_table_stamp(t->h));
+        else
+            update = FALSE;
+    }
+
+    proxy->notify_update |= update;
+}
+
+
+static int collect_watch_notification(pep_watch_t *w)
+{
+    pep_proxy_t  *proxy = w->proxy;
+    pep_table_t  *t     = w->table;
+    mql_result_t *r     = NULL;
+    uint16_t      tid   = w->id;
+    uint16_t      nrow, ncol;
+    mrp_msg_t    *msg;
+    int           i, j;
+    int           types[MQI_COLUMN_MAX];
+    const char   *str;
+    uint32_t      u32;
+    int32_t       s32;
+    double        dbl;
+
+    mrp_debug("updating %s watch for %s", t->name, proxy->name);
+
+    if (proxy->notify_msg == NULL) {
+        proxy->notify_msg = create_notify_message();
+
+        if (proxy->notify_msg == NULL)
+            goto fail;
+    }
+
+    if (t->h != MQI_HANDLE_INVALID) {
+        if (!exec_mql(mql_result_rows, &r, "select %s from %s%s%s",
+                      w->mql_columns, t->name,
+                     w->mql_where[0] ? " where " : "", w->mql_where)) {
+            mrp_debug("select from table %s failed", t->name);
+            goto fail;
+        }
+    }
+
+    if (r != NULL) {
+        nrow = mql_result_rows_get_row_count(r);
+        ncol = mql_result_rows_get_row_column_count(r);
+    }
+    else
+        nrow = ncol = 0;
+
+    msg = proxy->notify_msg;
+
+    if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid))  ||
+        !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nrow)) ||
+        !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOL , ncol)))
+        goto fail;
+
+    for (i = 0; i < ncol; i++)
+        types[i] = mql_result_rows_get_row_column_type(r, i);
+
+    for (i = 0; i < nrow; i++) {
+        for (j = 0; j < ncol; j++) {
+            switch (types[j]) {
+            case mqi_string:
+                str = mql_result_rows_get_string(r, j, i, NULL, 0);
+                if (!mrp_msg_append(msg, MRP_PEPMSG_STRING(DATA, str)))
+                    goto fail;
+                break;
+            case mqi_integer:
+                s32 = mql_result_rows_get_integer(r, j, i);
+                if (!mrp_msg_append(msg, MRP_PEPMSG_SINT32(DATA, s32)))
+                    goto fail;
+                break;
+            case mqi_unsignd:
+                u32 = mql_result_rows_get_unsigned(r, j, i);
+                if (!mrp_msg_append(msg, MRP_PEPMSG_UINT32(DATA, u32)))
+                    goto fail;
+                break;
+
+            case mqi_floating:
+                dbl = mql_result_rows_get_floating(r, j, i);
+                if (!mrp_msg_append(msg, MRP_PEPMSG_DOUBLE(DATA, dbl)))
+                    goto fail;
+                break;
+
+            default:
+                goto fail;
+            }
+        }
+    }
+
+    if (r != NULL)
+        mql_result_free(r);
+
+    proxy->notify_ncolumn += nrow * ncol;
+    proxy->notify_ntable++;
+
+    return TRUE;
+
+ fail:
+    if (r != NULL)
+        mql_result_free(r);
+    mrp_msg_unref(proxy->notify_msg);
+    proxy->notify_msg = NULL;
+    proxy->notify_fail = TRUE;
+
+    return FALSE;
+}
+
+
+static int send_proxy_notification(pep_proxy_t *proxy)
+{
+    uint16_t nchange, ntotal;
+
+    if (proxy->notify_msg == NULL)
+        return TRUE;
+
+    if (!proxy->notify_fail) {
+        mrp_debug("notifying client %s", proxy->name);
+
+        nchange = proxy->notify_ntable;
+        ntotal  = proxy->notify_ncolumn;
+
+        mrp_msg_set(proxy->notify_msg, MRP_PEPMSG_UINT16(NCHANGE, nchange));
+        mrp_msg_set(proxy->notify_msg, MRP_PEPMSG_UINT16(NTOTAL , ntotal ));
+
+        /*
+          mrp_log_info("Notification message for client %s:", proxy->name);
+          mrp_msg_dump(proxy->notify_msg, stdout);
+        */
+
+        mrp_transport_send(proxy->t, proxy->notify_msg);
+    }
+    else
+        mrp_log_error("Failed to generate/send notification to %s.",
+                      proxy->name);
+
+    mrp_msg_unref(proxy->notify_msg);
+
+    proxy->notify_msg     = NULL;
+    proxy->notify_ntable  = 0;
+    proxy->notify_ncolumn = 0;
+    proxy->notify_fail    = FALSE;
+    proxy->notify_all     = FALSE;
+
+    return TRUE;
+}
+
+
+void notify_table_changes(pdp_t *pdp)
+{
+    mrp_list_hook_t *p, *n, *wp, *wn;
+    pep_proxy_t     *proxy;
+    pep_table_t     *t;
+    pep_watch_t     *w;
+
+    mrp_debug("notifying clients about table changes");
+
+    mrp_list_foreach(&pdp->proxies, p, n) {
+        proxy = mrp_list_entry(p, typeof(*proxy), hook);
+        prepare_proxy_notification(proxy);
+    }
+
+    mrp_list_foreach(&pdp->tables, p, n) {
+        t = mrp_list_entry(p, typeof(*t), hook);
+
+        mrp_list_foreach(&t->watches, wp, wn) {
+            w = mrp_list_entry(wp, typeof(*w), tbl_hook);
+            check_watch_notification(w);
+        }
+
+        t->notify_all = FALSE;
+    }
+
+    mrp_list_foreach(&pdp->proxies, p, n) {
+        proxy = mrp_list_entry(p, typeof(*proxy), hook);
+
+        if (proxy->notify_update || proxy->notify_all) {
+            mrp_list_foreach(&proxy->watches, wp, wn) {
+                w = mrp_list_entry(wp, typeof(*w), pep_hook);
+                if (!collect_watch_notification(w))
+                    break;
+            }
+
+            send_proxy_notification(proxy);
+        }
+    }
+}
diff --git a/src/plugins/domain-control/notify.h b/src/plugins/domain-control/notify.h
new file mode 100644 (file)
index 0000000..8493420
--- /dev/null
@@ -0,0 +1,8 @@
+#ifndef __MURPHY_DOMAIN_CONTROL_NOTIFY_H__
+#define __MURPHY_DOMAIN_CONTROL_NOTIFY_H__
+
+#include "domain-control-types.h"
+
+void notify_table_changes(pdp_t *pdp);
+
+#endif /* __MURPHY_DOMAIN_CONTROL_NOTIFY_H__ */
diff --git a/src/plugins/domain-control/plugin-domain-control.c b/src/plugins/domain-control/plugin-domain-control.c
new file mode 100644 (file)
index 0000000..934edc6
--- /dev/null
@@ -0,0 +1,56 @@
+#include <murphy/common/macros.h>
+
+#include <murphy/core/plugin.h>
+#include <murphy/core/console.h>
+
+#include "domain-control-types.h"
+#include "domain-control.h"
+#include "client.h"
+
+
+static int plugin_init(mrp_plugin_t *plugin)
+{
+    const char *address = MRP_DEFAULT_DOMCTL_ADDRESS;
+
+    plugin->data = create_domain_control(plugin->ctx, address);
+
+    return (plugin->data != NULL);
+}
+
+
+static void plugin_exit(mrp_plugin_t *plugin)
+{
+    pdp_t *pdp = (pdp_t *)plugin->data;
+
+    destroy_domain_control(pdp);
+}
+
+
+static void cmd_cb(mrp_console_t *c, void *user_data, int argc, char **argv)
+{
+    MRP_UNUSED(user_data);
+    MRP_UNUSED(argc);
+    MRP_UNUSED(argv);
+
+    mrp_console_printf(c, "domctrl:%s() called...\n", __FUNCTION__);
+}
+
+
+#define PLUGIN_DESCRIPTION "Murphy domain control plugin."
+#define PLUGIN_VERSION     MRP_VERSION_INT(0, 0, 1)
+#define PLUGIN_HELP        "TODO..."
+#define PLUGIN_AUTHORS     "Krisztian Litkey <krisztian.litkey@intel.com>"
+
+MRP_CONSOLE_GROUP(plugin_commands, "domain-control", NULL, NULL, {
+        MRP_TOKENIZED_CMD("cmd", cmd_cb, TRUE,
+                          "cmd [args]", "a command", "A command..."),
+});
+
+MURPHY_REGISTER_PLUGIN("domain-control",
+                       PLUGIN_VERSION, PLUGIN_DESCRIPTION,
+                       PLUGIN_AUTHORS, PLUGIN_HELP, MRP_SINGLETON,
+                       plugin_init, plugin_exit,
+                       NULL, 0, /* plugin argument table */
+                       NULL, 0, /* exported methods */
+                       NULL, 0, /* imported methods */
+                       &plugin_commands);
diff --git a/src/plugins/domain-control/proxy.c b/src/plugins/domain-control/proxy.c
new file mode 100644 (file)
index 0000000..705757a
--- /dev/null
@@ -0,0 +1,128 @@
+#include <errno.h>
+
+#include <murphy/common/log.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/list.h>
+
+#include "domain-control-types.h"
+#include "table.h"
+#include "proxy.h"
+
+
+int init_proxies(pdp_t *pdp)
+{
+    mrp_list_init(&pdp->proxies);
+
+    return TRUE;
+}
+
+
+void destroy_proxies(pdp_t *pdp)
+{
+    MRP_UNUSED(pdp);
+
+    return;
+}
+
+
+pep_proxy_t *create_proxy(pdp_t *pdp)
+{
+    pep_proxy_t *proxy;
+
+    proxy = mrp_allocz(sizeof(*proxy));
+
+    if (proxy != NULL) {
+        mrp_list_init(&proxy->hook);
+        mrp_list_init(&proxy->watches);
+
+        proxy->pdp = pdp;
+
+        mrp_list_append(&pdp->proxies, &proxy->hook);
+    }
+
+    return proxy;
+}
+
+
+void destroy_proxy(pep_proxy_t *proxy)
+{
+    int i;
+
+    if (proxy != NULL) {
+        mrp_list_delete(&proxy->hook);
+
+        for (i = 0; i < proxy->ntable; i++)
+            destroy_proxy_table(proxy->tables + i);
+
+        destroy_proxy_watches(proxy);
+
+        mrp_free(proxy);
+    }
+}
+
+
+int register_proxy(pep_proxy_t *proxy, char *name,
+                   mrp_domctl_table_t *tables, int ntable,
+                   mrp_domctl_watch_t *watches, int nwatch,
+                   int *error, const char **errmsg)
+{
+    pep_table_t        *t;
+    mrp_domctl_watch_t *w;
+    int                 i;
+
+    proxy->name   = mrp_strdup(name);
+    proxy->tables = mrp_allocz_array(typeof(*proxy->tables) , ntable);
+    proxy->ntable = ntable;
+
+    if (proxy->name == NULL || (ntable && proxy->tables == NULL)) {
+        *error  = ENOMEM;
+        *errmsg = "failed to allocate proxy table";
+
+        return FALSE;
+    }
+
+    for (i = 0, t = proxy->tables; i < ntable; i++, t++) {
+        t->h           = MQI_HANDLE_INVALID;
+        t->name        = mrp_strdup(tables[i].table);
+        t->mql_columns = mrp_strdup(tables[i].mql_columns);
+        t->mql_index   = mrp_strdup(tables[i].mql_index);
+
+        if (t->name == NULL || t->mql_columns == NULL || t->mql_index == NULL) {
+            mrp_log_error("Failed to allocate proxy table %s for %s.",
+                          tables[i].table, name);
+            *error  = ENOMEM;
+            *errmsg = "failed to allocate proxy table";
+
+            return FALSE;
+        }
+
+        if (create_proxy_table(t, error, errmsg))
+            mrp_log_info("Client %s created table %s.", proxy->name,
+                         tables[i].table);
+        else {
+            mrp_log_error("Client %s failed to create table %s (%d: %s).",
+                          proxy->name, tables[i].table, *error, *errmsg);
+            return FALSE;
+        }
+    }
+
+    for (i = 0, w = watches; i < nwatch; i++, w++) {
+        if (create_proxy_watch(proxy, i, w->table, w->mql_columns,
+                               w->mql_where, w->max_rows, error, errmsg))
+            mrp_log_info("Client %s subscribed for table %s.", proxy->name,
+                         w->table);
+        else
+            mrp_log_error("Client %s failed to subscribe for table %s.",
+                          proxy->name, w->table);
+    }
+
+    return TRUE;
+}
+
+
+int unregister_proxy(pep_proxy_t *proxy)
+{
+    destroy_proxy(proxy);
+
+    return TRUE;
+}
diff --git a/src/plugins/domain-control/proxy.h b/src/plugins/domain-control/proxy.h
new file mode 100644 (file)
index 0000000..6308e61
--- /dev/null
@@ -0,0 +1,18 @@
+#ifndef __MURPHY_DOMAIN_CONTROL_PROXY_H__
+#define __MURPHY_DOMAIN_CONTROL_PROXY_H__
+
+#include "domain-control-types.h"
+
+int init_proxies(pdp_t *pdp);
+void destroy_proxies(pdp_t *pdp);
+
+pep_proxy_t *create_proxy(pdp_t *pdp);
+void destroy_proxy(pep_proxy_t *proxy);
+
+int register_proxy(pep_proxy_t *proxy, char *name,
+                   mrp_domctl_table_t *tables, int ntable,
+                   mrp_domctl_watch_t *watches, int nwatch,
+                   int *error, const char **errmsg);
+int unregister_proxy(pep_proxy_t *proxy);
+
+#endif /* __MURPHY_DOMAIN_CONTROL_PROXY_H__ */
diff --git a/src/plugins/domain-control/table-common.c b/src/plugins/domain-control/table-common.c
new file mode 100644 (file)
index 0000000..b0489e6
--- /dev/null
@@ -0,0 +1,10 @@
+#include <errno.h>
+
+#include <murphy/common/debug.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/common/utils.h>
+
+#include <murphy-db/mqi.h>
+
+#include "table.h"
diff --git a/src/plugins/domain-control/table.c b/src/plugins/domain-control/table.c
new file mode 100644 (file)
index 0000000..462119b
--- /dev/null
@@ -0,0 +1,501 @@
+#include <errno.h>
+#include <stdarg.h>
+
+#include <murphy/common/debug.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/common/utils.h>
+
+#include <murphy-db/mqi.h>
+#include <murphy-db/mql.h>
+
+#include "domain-control.h"
+#include "table.h"
+
+#define FAIL(ec, msg) do {                      \
+        *errcode = ec;                          \
+        *errmsg = msg;                          \
+        goto fail;                              \
+    } while (0)
+
+static pep_table_t *lookup_watch_table(pdp_t *pdp, const char *name);
+
+#include "table-common.c"
+
+/*
+ * proxied and tracked tables
+ */
+
+
+static void table_event_cb(mqi_event_t *e, void *user_data)
+{
+    pdp_t        *pdp  = (pdp_t *)user_data;
+    const char   *name = e->table.table.name;
+    mqi_handle_t  h    = e->table.table.handle;
+    pep_table_t  *t;
+
+    switch (e->event) {
+    case mqi_table_created:
+        mrp_debug("table %s (0x%x) created", name, h);
+        break;
+
+    case mqi_table_dropped:
+        mrp_debug("table %s (0x%x) dropped", name, h);
+        break;
+    default:
+        return;
+    }
+
+    t = lookup_watch_table(pdp, name);
+
+    if (t != NULL) {
+        t->notify_all = TRUE;
+        t->h          = h;
+    }
+
+    schedule_notification(pdp);
+}
+
+
+static void transaction_event_cb(mqi_event_t *e, void *user_data)
+{
+    pdp_t *pdp = (pdp_t *)user_data;
+
+    switch (e->event) {
+    case mqi_transaction_end:
+        mrp_debug("transaction ended");
+        schedule_notification(pdp);
+        break;
+
+    case mqi_transaction_start:
+        mrp_debug("transaction started");
+        break;
+
+    default:
+        break;
+    }
+}
+
+
+static int open_db(pdp_t *pdp)
+{
+    if (mqi_open() == 0) {
+        if (mqi_create_transaction_trigger(transaction_event_cb, pdp) == 0) {
+            if (mqi_create_table_trigger(table_event_cb, pdp) == 0)
+                return TRUE;
+            else
+                mqi_drop_transaction_trigger(transaction_event_cb, pdp);
+        }
+    }
+
+    return FALSE;
+}
+
+
+static void close_db(pdp_t *pdp)
+{
+    mqi_drop_table_trigger(table_event_cb, pdp);
+    mqi_drop_transaction_trigger(transaction_event_cb, pdp);
+}
+
+
+static void purge_watch_table_cb(void *key, void *entry);
+
+
+
+int init_tables(pdp_t *pdp)
+{
+    mrp_htbl_config_t hcfg;
+
+    if (open_db(pdp)) {
+        mrp_list_init(&pdp->tables);
+
+        mrp_clear(&hcfg);
+        hcfg.comp = mrp_string_comp;
+        hcfg.hash = mrp_string_hash;
+        hcfg.free = purge_watch_table_cb;
+
+        pdp->watched = mrp_htbl_create(&hcfg);
+    }
+
+    return (pdp->watched != NULL);
+}
+
+
+void destroy_tables(pdp_t *pdp)
+{
+    close_db(pdp);
+    mrp_htbl_destroy(pdp->watched, TRUE);
+
+    pdp->watched = NULL;
+}
+
+
+int exec_mql(mql_result_type_t type, mql_result_t **resultp,
+             const char *format, ...)
+{
+    mql_result_t *r;
+    char          buf[4096];
+    va_list       ap;
+    int           success, n;
+
+    va_start(ap, format);
+    n = vsnprintf(buf, sizeof(buf), format, ap);
+    va_end(ap);
+
+    if (n < (int)sizeof(buf)) {
+        r       = mql_exec_string(type, buf);
+        success = (r == NULL || mql_result_is_success(r));
+
+        if (resultp != NULL) {
+            *resultp = r;
+            return success;
+        }
+        else {
+            mql_result_free(r);
+            return success;
+        }
+    }
+    else {
+        errno = EOVERFLOW;
+        if (resultp != NULL)
+            *resultp = NULL;
+
+        return FALSE;
+    }
+}
+
+
+static int get_table_description(pep_table_t *t)
+{
+    mqi_column_def_t    columns[MQI_COLUMN_MAX];
+    mrp_domctl_value_t *values = NULL;
+    int                 ncolumn, i;
+
+    if (t->h == MQI_HANDLE_INVALID)
+        t->h = mqi_get_table_handle((char *)t->name);
+
+    if (t->h != MQI_HANDLE_INVALID) {
+        ncolumn = mqi_describe(t->h, columns, MRP_ARRAY_SIZE(columns));
+
+        if (ncolumn > 0) {
+            t->columns = mrp_allocz_array(typeof(*t->columns), ncolumn);
+            t->coldesc = mrp_allocz_array(typeof(*t->coldesc), ncolumn + 1);
+
+            if (t->columns != NULL && t->coldesc != NULL) {
+                memcpy(t->columns, columns, ncolumn * sizeof(*t->columns));
+                t->ncolumn = ncolumn;
+
+                for (i = 0; i < t->ncolumn; i++) {
+                    t->coldesc[i].cindex = i;
+                    t->coldesc[i].offset = (int)(ptrdiff_t)&values[i].str;
+                }
+
+                t->coldesc[i].cindex = -1;
+                t->coldesc[i].offset = 0;
+
+                return TRUE;
+            }
+        }
+    }
+
+    return FALSE;
+}
+
+
+int create_proxy_table(pep_table_t *t, int *errcode, const char **errmsg)
+{
+    mrp_list_init(&t->hook);
+    mrp_list_init(&t->watches);
+
+    if (mqi_get_table_handle((char *)t->name) != MQI_HANDLE_INVALID)
+        FAIL(EEXIST, "DB error: table already exists");
+
+    if (exec_mql(mql_result_dontcare, NULL,
+                 "create temporary table %s (%s)", t->name, t->mql_columns)) {
+        if (t->mql_index && t->mql_index[0]) {
+            if (!exec_mql(mql_result_dontcare, NULL,
+                          "create index on %s (%s)", t->name, t->mql_index))
+                FAIL(EINVAL, "failed to table index");
+        }
+
+        if (!get_table_description(t))
+            FAIL(EINVAL, "DB error: failed to get table description");
+
+        return TRUE;
+    }
+    else
+        FAIL(ENOMEM, "DB error: failed to create table");
+
+ fail:
+    return FALSE;
+}
+
+
+void destroy_proxy_table(pep_table_t *t)
+{
+    mrp_debug("destroying table %s", t->name ? t->name : "<unknown>");
+
+    if (t->h != MQI_HANDLE_INVALID)
+        mqi_drop_table(t->h);
+
+    mrp_free(t->mql_columns);
+    mrp_free(t->mql_index);
+
+    mrp_free(t->columns);
+    mrp_free(t->coldesc);
+    mrp_free(t->name);
+
+    t->name    = NULL;
+    t->h       = MQI_HANDLE_INVALID;
+    t->columns = NULL;
+    t->ncolumn = 0;
+}
+
+
+void destroy_proxy_tables(pep_proxy_t *proxy)
+{
+    mqi_handle_t tx;
+    int          i;
+
+    mrp_debug("destroying tables of client %s", proxy->name);
+
+    tx = mqi_begin_transaction();
+    for (i = 0; i < proxy->ntable; i++)
+        destroy_proxy_table(proxy->tables + i);
+    mqi_commit_transaction(tx);
+
+    proxy->tables = NULL;
+    proxy->ntable = 0;
+}
+
+
+pep_table_t *create_watch_table(pdp_t *pdp, const char *name)
+{
+    pep_table_t *t;
+
+    t = mrp_allocz(sizeof(*t));
+
+    if (t != NULL) {
+        mrp_list_init(&t->hook);
+        mrp_list_init(&t->watches);
+
+        t->h    = MQI_HANDLE_INVALID;
+        t->name = mrp_strdup(name);
+
+        if (t->name == NULL)
+            goto fail;
+
+        get_table_description(t);
+
+        if (!mrp_htbl_insert(pdp->watched, t->name, t))
+            goto fail;
+
+        mrp_list_append(&pdp->tables, &t->hook);
+    }
+
+    return t;
+
+ fail:
+    destroy_watch_table(pdp, t);
+
+    return FALSE;
+}
+
+
+static void destroy_table_watches(pep_table_t *t)
+{
+    pep_watch_t     *w;
+    mrp_list_hook_t *p, *n;
+
+    if (t != NULL) {
+        mrp_list_foreach(&t->watches, p, n) {
+            w = mrp_list_entry(p, typeof(*w), tbl_hook);
+
+            mrp_list_delete(&w->tbl_hook);
+            mrp_list_delete(&w->pep_hook);
+
+            mrp_free(w->mql_columns);
+            mrp_free(w->mql_where);
+            mrp_free(w);
+        }
+    }
+}
+
+
+void destroy_watch_table(pdp_t *pdp, pep_table_t *t)
+{
+    mrp_list_delete(&t->hook);
+    t->h = MQI_HANDLE_INVALID;
+
+    if (pdp != NULL)
+        mrp_htbl_remove(pdp->watched, t->name, FALSE);
+
+    destroy_table_watches(t);
+}
+
+
+static pep_table_t *lookup_watch_table(pdp_t *pdp, const char *name)
+{
+    return mrp_htbl_lookup(pdp->watched, (void *)name);
+}
+
+
+static void purge_watch_table_cb(void *key, void *entry)
+{
+    pep_table_t *t = (pep_table_t *)entry;
+
+    MRP_UNUSED(key);
+
+    destroy_watch_table(NULL, t);
+}
+
+
+int create_proxy_watch(pep_proxy_t *proxy, int id,
+                       const char *table, const char *mql_columns,
+                       const char *mql_where, int max_rows,
+                       int *error, const char **errmsg)
+{
+    pdp_t       *pdp = proxy->pdp;
+    pep_table_t *t;
+    pep_watch_t *w;
+
+    t = lookup_watch_table(pdp, table);
+
+    if (t == NULL) {
+        t = create_watch_table(pdp, table);
+
+        if (t == NULL) {
+            *error  = EINVAL;
+            *errmsg = "failed to watch table";
+        }
+    }
+
+    w = mrp_allocz(sizeof(*w));
+
+    if (w != NULL) {
+        mrp_list_init(&w->tbl_hook);
+        mrp_list_init(&w->pep_hook);
+
+        w->table        = t;
+        w->mql_columns  = mrp_strdup(mql_columns);
+        w->mql_where    = mrp_strdup(mql_where ? mql_where : "");
+        w->max_rows     = max_rows;
+        w->proxy        = proxy;
+        w->id           = id;
+
+        if (w->mql_columns == NULL || w->mql_where == NULL)
+            goto fail;
+
+        mrp_list_append(&t->watches, &w->tbl_hook);
+        mrp_list_append(&proxy->watches, &w->pep_hook);
+
+        return TRUE;
+    }
+    else {
+        *error  = ENOMEM;
+        *errmsg = "failed to allocate table watch";
+    }
+
+ fail:
+    if (w != NULL) {
+        mrp_free(w->mql_columns);
+        mrp_free(w->mql_where);
+        mrp_free(w);
+    }
+
+    return FALSE;
+}
+
+
+void destroy_proxy_watches(pep_proxy_t *proxy)
+{
+    pep_watch_t     *w;
+    mrp_list_hook_t *p, *n;
+
+    if (proxy != NULL) {
+        mrp_list_foreach(&proxy->watches, p, n) {
+            w = mrp_list_entry(p, typeof(*w), pep_hook);
+
+            mrp_list_delete(&w->tbl_hook);
+            mrp_list_delete(&w->pep_hook);
+
+            mrp_free(w);
+        }
+    }
+}
+
+
+static void reset_proxy_tables(pep_proxy_t *proxy)
+{
+    int i;
+
+    for (i = 0; i < proxy->ntable; i++)
+        mqi_delete_from(proxy->tables[i].h, NULL);
+}
+
+
+static int insert_into_table(pep_table_t *t,
+                             mrp_domctl_value_t **rows, int nrow)
+{
+    void *data[2];
+    int   i;
+
+    data[1] = NULL;
+
+    for (i = 0; i < nrow; i++) {
+        data[0] = rows[i];
+        if (mqi_insert_into(t->h, 0, t->coldesc, data) != 1)
+            return FALSE;
+    }
+
+    return TRUE;
+}
+
+
+int set_proxy_tables(pep_proxy_t *proxy, mrp_domctl_data_t *tables, int ntable,
+                     int *error, const char **errmsg)
+{
+    mqi_handle_t    tx;
+    pep_table_t    *t;
+    int             i, id;
+
+    tx = mqi_begin_transaction();
+
+    if (tx != MQI_HANDLE_INVALID) {
+        reset_proxy_tables(proxy);
+
+        for (i = 0; i < ntable; i++) {
+            id = tables[i].id;
+
+            if (id < 0 || id >= proxy->ntable)
+                goto fail;
+
+            t = proxy->tables + id;
+
+            if (tables[i].ncolumn != t->ncolumn)
+                goto fail;
+
+#if 0
+            if (!delete_from_table(t, tables[i].rows, tables[i].nrow))
+                goto fail;
+#endif
+
+            if (!insert_into_table(t, tables[i].rows, tables[i].nrow))
+                goto fail;
+
+
+        }
+
+        mqi_commit_transaction(tx);
+
+        return TRUE;
+
+    fail:
+        *error  = EINVAL;
+        *errmsg = "failed to set tables";
+        mqi_rollback_transaction(tx);
+    }
+
+    return FALSE;
+}
diff --git a/src/plugins/domain-control/table.h b/src/plugins/domain-control/table.h
new file mode 100644 (file)
index 0000000..283928f
--- /dev/null
@@ -0,0 +1,33 @@
+#ifndef __MURPHY_DOMAIN_CONTROL_TABLE_H__
+#define __MURPHY_DOMAIN_CONTROL_TABLE_H__
+
+#include <murphy-db/mql-result.h>
+
+#include "client.h"
+#include "domain-control-types.h"
+
+int init_tables(pdp_t *pdp);
+void destroy_tables(pdp_t *pdp);
+
+int create_proxy_table(pep_table_t *t, int *errcode, const char **errmsg);
+
+int create_proxy_watch(pep_proxy_t *proxy, int id,
+                       const char *table, const char *mql_columns,
+                       const char *mql_where, int max_rows,
+                       int *error, const char **errmsg);
+
+void destroy_watch_table(pdp_t *pdp, pep_table_t *t);
+
+void destroy_proxy_table(pep_table_t *t);
+void destroy_proxy_tables(pep_proxy_t *proxy);
+
+void destroy_proxy_watches(pep_proxy_t *proxy);
+
+int set_proxy_tables(pep_proxy_t *proxy, mrp_domctl_data_t *tables, int ntable,
+                     int *error, const char **errmsg);
+
+int exec_mql(mql_result_type_t type, mql_result_t **resultp,
+             const char *format, ...);
+
+
+#endif /* __MURPHY_DOMAIN_CONTROL_TABLE_H__ */
diff --git a/src/plugins/domain-control/test-client.c b/src/plugins/domain-control/test-client.c
new file mode 100644 (file)
index 0000000..ac9c377
--- /dev/null
@@ -0,0 +1,1135 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <netdb.h>
+#include <signal.h>
+#include <alloca.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#define _GNU_SOURCE
+#include <getopt.h>
+
+#include <readline/readline.h>
+#include <readline/history.h>
+
+#include <murphy/common.h>
+#include "client.h"
+
+#define DEFAULT_PROMPT "test-controller> "
+
+
+/*
+ * client context
+ */
+
+typedef struct {
+    const char      *addrstr;            /* server address */
+    int              zone;               /* run in zone control mode */
+    int              verbose;            /* verbose mode */
+    mrp_mainloop_t  *ml;                 /* murphy mainloop */
+    void            *dc;                 /* domain controller */
+    int              fd;                 /* fd for terminal input */
+    mrp_io_watch_t  *iow;                /* I/O watch for terminal input */
+} client_t;
+
+
+#define NVALUE 512
+
+
+/*
+ * device and stream definitions
+ */
+
+#define NDEVICE        (MRP_ARRAY_SIZE(devices) - 1)
+#define DEVICE_NCOLUMN 4
+
+typedef struct {
+    const char *name;
+    const char *type;
+    int         public;
+    int         available;
+} device_t;
+
+static device_t devices[] = {
+    { "builtin-speaker" , "speaker"  , TRUE , TRUE  },
+    { "builtin-earpiece", "speaker"  , FALSE, TRUE  },
+    { "usb-speaker"     , "speaker"  , TRUE , FALSE },
+    { "a2dp-speaker"    , "speaker"  , TRUE , FALSE },
+    { "wired-headset"   , "headset"  , FALSE, FALSE },
+    { "usb-headphone"   , "headphone", FALSE, FALSE },
+    { "a2dp-headphone"  , "headphone", FALSE, FALSE },
+    { "sco-headset"     , "headset"  , FALSE, FALSE },
+    { NULL              , NULL       , FALSE, FALSE }
+};
+
+#define NSTREAM        (MRP_ARRAY_SIZE(streams) - 1)
+#define STREAM_NCOLUMN 4
+
+typedef struct {
+    const char *name;
+    const char *role;
+    pid_t       owner;
+    int         playing;
+} stream_t;
+
+static stream_t streams[] = {
+    { "player1", "player"   , 1234, FALSE },
+    { "player2", "player"   , 4321, FALSE },
+    { "navit"  , "navigator", 5432, FALSE },
+    { "phone"  , "call"     , 6666, FALSE },
+    { NULL     , NULL       , 0   , FALSE }
+};
+
+
+/*
+ * device and stream descriptors
+ */
+
+#define DEVICE_COLUMNS          \
+    "name      varchar(32), "   \
+    "type      varchar(32), "   \
+    "public    integer    , "   \
+    "available integer"
+
+#define DEVICE_INDEX "name"
+
+#define DEVICE_SELECT "*"
+
+#define DEVICE_WHERE  NULL
+
+#define STREAM_COLUMNS          \
+    "name      varchar(32),"    \
+    "role      varchar(32),"    \
+    "owner     unsigned   ,"    \
+    "playing   integer"
+
+#define STREAM_INDEX "name"
+
+#define STREAM_SELECT "*"
+#define STREAM_WHERE  NULL
+
+mrp_domctl_table_t media_tables[] = {
+    MRP_DOMCTL_TABLE("devices", DEVICE_COLUMNS, DEVICE_INDEX),
+    MRP_DOMCTL_TABLE("streams", STREAM_COLUMNS, STREAM_INDEX),
+};
+
+mrp_domctl_watch_t media_watches[] = {
+    MRP_DOMCTL_WATCH("devices", DEVICE_SELECT, DEVICE_WHERE, 0),
+    MRP_DOMCTL_WATCH("streams", STREAM_SELECT, STREAM_WHERE, 0),
+};
+
+
+/*
+ * zone and call definitions
+ */
+
+#define NZONE        (MRP_ARRAY_SIZE(zones) - 1)
+#define ZONE_NCOLUMN 3
+
+typedef struct {
+    const char *name;
+    int         occupied;
+    int         active;
+} zone_t;
+
+static zone_t zones[] = {
+    { "driver"     , TRUE , FALSE },
+    { "fearer"     , FALSE, TRUE  },
+    { "back-left"  , TRUE , FALSE },
+    { "back-center", FALSE, FALSE },
+    { "back-right" , TRUE , TRUE  },
+    { NULL         , FALSE, FALSE }
+};
+
+
+#define NCALL        (MRP_ARRAY_SIZE(calls) - 1)
+#define CALL_NCOLUMN 3
+
+typedef struct {
+    int         id;
+    const char *state;
+    const char *modem;
+} call_t;
+
+static call_t calls[] = {
+    { 1, "active"  , "modem1" },
+    { 2, "ringing" , "modem1" },
+    { 3, "held"    , "modem2" },
+    { 4, "alerting", "modem2" },
+    { 0, NULL      , NULL     }
+};
+
+
+/*
+ * zone and call descriptors
+ */
+
+#define ZONE_COLUMNS          \
+    "name      varchar(32), " \
+    "occupied  integer    , " \
+    "active    integer"
+
+#define ZONE_INDEX "name"
+
+#define ZONE_SELECT "*"
+
+#define ZONE_WHERE  NULL
+
+#define CALL_COLUMNS          \
+    "id        integer    , " \
+    "state     varchar(32), " \
+    "modem     varchar(32)"
+
+#define CALL_INDEX "id"
+
+#define CALL_SELECT "*"
+
+#define CALL_WHERE  NULL
+
+mrp_domctl_table_t zone_tables[] = {
+    MRP_DOMCTL_TABLE("zones", ZONE_COLUMNS, ZONE_INDEX),
+    MRP_DOMCTL_TABLE("calls", CALL_COLUMNS, CALL_INDEX),
+};
+
+mrp_domctl_watch_t zone_watches[] = {
+    MRP_DOMCTL_WATCH("zones", ZONE_SELECT, ZONE_WHERE, 0),
+    MRP_DOMCTL_WATCH("calls", CALL_SELECT, CALL_WHERE, 0)
+};
+
+mrp_domctl_table_t *exports;
+int                 nexport;
+mrp_domctl_watch_t *imports;
+int                 nimport;
+
+
+static client_t *client;
+
+
+static void fatal_msg(int error, const char *format, ...);
+static void error_msg(const char *format, ...);
+static void info_msg(const char *format, ...);
+
+static void terminal_input_cb(char *input);
+
+static void export_data(client_t *c);
+
+
+static void plug_device(client_t *c, const char *name, int plug)
+{
+    device_t *d;
+    int       changed;
+
+    if (c->zone) {
+        error_msg("cannot plug/unplug, client is in zone mode");
+        return;
+    }
+
+    changed = FALSE;
+
+    for (d = devices; d->name != NULL; d++) {
+        if (!strcmp(d->name, name)) {
+            changed = plug ^ d->available;
+            d->available = plug;
+            break;
+        }
+    }
+
+    if (changed) {
+        info_msg("device '%s' is now %splugged", d->name, plug ? "" : "un");
+        export_data(c);
+    }
+}
+
+
+static void list_devices(void)
+{
+    device_t *d;
+    int       n;
+
+    for (d = devices, n = 0; d->name != NULL; d++, n++) {
+        info_msg("device '%s': (%s, %s), %s",
+                 d->name, d->type, d->public ? "public" : "private",
+                 d->available ? "available" : "currently unplugged");
+    }
+
+    if (n == 0)
+        info_msg("devices: none");
+}
+
+
+static void play_stream(client_t *c, const char *name, int play)
+{
+    stream_t *s;
+    int       changed;
+
+    if (c->zone) {
+        error_msg("cannot control streams, client is in zone mode");
+        return;
+    }
+
+    changed = FALSE;
+
+    for (s = streams; s->name != NULL; s++) {
+        if (!strcmp(s->name, name)) {
+            changed = play ^ s->playing;
+            s->playing = play;
+            break;
+        }
+    }
+
+    if (changed) {
+        info_msg("stream '%s' is now %s", s->name, play ? "playing":"stopped");
+        export_data(c);
+    }
+}
+
+
+static void list_streams(void)
+{
+    stream_t *s;
+    int       n;
+
+    for (s = streams, n = 0; s->name != NULL; s++, n++) {
+        info_msg("stream '%s': role %s, owner %u, currently %splaying",
+                 s->name, s->role, s->owner, s->playing ? "" : "not ");
+    }
+
+    if (n == 0)
+        info_msg("streams: none");
+}
+
+
+static void set_zone_state(client_t *c, char *config)
+{
+    zone_t *z;
+    int     occupied, active, changed, len;
+    char    name[256], *end;
+
+    if (!c->zone) {
+        error_msg("cannot control zones, client is not in zone mode");
+        return;
+    }
+
+    while (*config == ' ' || *config == '\t')
+        config++;
+
+    end = strchr(config, ' ');
+    if (end == NULL)
+        return;
+
+    len = end - config;
+    strncpy(name, config, len);
+    name[len] = '\0';
+
+    config = end + 1;
+    while (*config == ' ' || *config == '\t')
+        config++;
+
+    occupied = FALSE;
+    active   = FALSE;
+    changed  = FALSE;
+
+    if (strstr(config, "occupied"))
+        occupied = TRUE;
+    if (strstr(config, "active"))
+        active = TRUE;
+
+    for (z = zones; z->name != NULL; z++) {
+        if (!strcmp(z->name, name)) {
+            changed     = (active ^ z->active) | (occupied ^ z->occupied);
+            z->active   = active;
+            z->occupied = occupied;
+            break;
+        }
+    }
+
+    if (changed) {
+        info_msg("zone '%s' is now %s and %s", z->name,
+                 z->occupied ? "occupied" : "free",
+                 z->active   ? "active"   : "idle");
+        export_data(c);
+    }
+}
+
+
+static void list_zones(void)
+{
+    zone_t *z;
+    int     n;
+
+    for (z = zones, n = 0; z->name != NULL; z++, n++) {
+        info_msg("zone '%s' is now %s and %s", z->name,
+                 z->occupied ? "occupied" : "free",
+                 z->active   ? "active"   : "idle");
+    }
+
+    if (n == 0)
+        info_msg("zones: none");
+}
+
+
+static void set_call_state(client_t *c, const char *config)
+{
+    call_t *call;
+    char   idstr[64], *state, *end;
+    int     id, changed, len;
+
+    if (!c->zone) {
+        error_msg("cannot control calls, client is not in zone mode");
+        return;
+    }
+
+    while (*config == ' ' || *config == '\t')
+        config++;
+
+    end = strchr(config, ' ');
+    if (end == NULL)
+        return;
+
+    len = end - config;
+    strncpy(idstr, config, len);
+    idstr[len] = '\0';
+
+    config = end + 1;
+    while (*config == ' ' || *config == '\t')
+        config++;
+    state = (char *)config;
+
+    id = strtoul(idstr, &end, 10);
+
+    if (end && *end) {
+        error_msg("invalid call id '%s'", idstr);
+        return;
+    }
+
+    for (call = calls; call->id > 0; call++) {
+        if (call->id == id) {
+            if (strcmp(call->state, state)) {
+                mrp_free((char *)call->state);
+                call->state = mrp_strdup(state);
+                changed     = TRUE;
+                break;
+            }
+        }
+    }
+
+    if (changed) {
+        info_msg("call #%d is now %s", call->id, call->state);
+        export_data(c);
+    }
+}
+
+
+static void list_calls(void)
+{
+    call_t *c;
+    int     n;
+
+    for (c = calls, n = 0; c->id > 0; c++, n++) {
+        info_msg("call #%d: %s (on modem %s)", c->id, c->state, c->modem);
+    }
+
+    if (n == 0)
+        info_msg("calls: none");
+}
+
+
+static void reset_devices(void)
+{
+    mrp_clear(&devices);
+}
+
+
+void update_devices(mrp_domctl_data_t *data)
+{
+    device_t           *d;
+    mrp_domctl_value_t *v;
+    int                 i;
+
+    if (data->nrow != 0 && data->ncolumn != DEVICE_NCOLUMN) {
+        error_msg("incorrect number of columns (%d) in device update",
+                  data->ncolumn);
+        return;
+    }
+
+    if (data->nrow > (int)NDEVICE) {
+        error_msg("too many rows (%d) in device update", data->nrow);
+        return;
+    }
+
+    if (data->nrow == 0)
+        reset_devices();
+    else {
+        d = devices;
+
+        for (i = 0; i < data->nrow; i++) {
+            mrp_free((char *)d->name);
+            mrp_free((char *)d->type);
+
+            v            = data->rows[i];
+            d->name      = mrp_strdup(v[0].str);
+            d->type      = mrp_strdup(v[1].str);
+            d->public    = v[2].s32;
+            d->available = v[3].s32;
+
+            d += 1;
+        }
+    }
+
+    list_devices();
+}
+
+
+static void reset_streams(void)
+{
+    mrp_clear(&streams);
+}
+
+
+void update_streams(mrp_domctl_data_t *data)
+{
+    stream_t           *s;
+    mrp_domctl_value_t *v;
+    int                 i;
+
+    if (data->nrow != 0 && data->ncolumn != STREAM_NCOLUMN) {
+        error_msg("incorrect number of columns (%d) in stream update",
+                  data->ncolumn);
+        return;
+    }
+
+    if (data->nrow > (int)NSTREAM) {
+        error_msg("too many rows (%d) in stream update", data->nrow);
+        return;
+    }
+
+    if (data->nrow == 0)
+        reset_streams();
+    else {
+        s = streams;
+
+        for (i = 0; i < data->nrow; i++) {
+            mrp_free((char *)s->name);
+            mrp_free((char *)s->role);
+
+            v          = data->rows[i];
+            s->name    = mrp_strdup(v[0].str);
+            s->role    = mrp_strdup(v[1].str);
+            s->owner   = v[2].u32;
+            s->playing = v[3].s32;
+
+            s += 1;
+        }
+    }
+
+    list_streams();
+}
+
+
+static void reset_zones(void)
+{
+    mrp_clear(&zones);
+}
+
+
+void update_zones(mrp_domctl_data_t *data)
+{
+    zone_t             *z;
+    mrp_domctl_value_t *v;
+    int                 i;
+
+    if (data->nrow != 0 && data->ncolumn != ZONE_NCOLUMN) {
+        error_msg("incorrect number of columns (%d) in zone update",
+                  data->ncolumn);
+        return;
+    }
+
+    if (data->nrow > (int)NZONE) {
+        error_msg("too many rows (%d) in zone update", data->nrow);
+        return;
+    }
+
+    if (data->nrow == 0)
+        reset_zones();
+    else {
+        z = zones;
+
+        for (i = 0; i < data->nrow; i++) {
+            mrp_free((char *)z->name);
+
+            v           = data->rows[i];
+            z->name     = mrp_strdup(v[0].str);
+            z->occupied = v[1].s32;
+            z->active   = v[2].s32;
+
+            z += 1;
+        }
+    }
+
+    list_zones();
+}
+
+
+static void reset_calls(void)
+{
+    mrp_clear(&calls);
+}
+
+
+void update_calls(mrp_domctl_data_t *data)
+{
+    call_t             *c;
+    mrp_domctl_value_t *v;
+    int                 i;
+
+    if (data->nrow != 0 && data->ncolumn != CALL_NCOLUMN) {
+        error_msg("incorrect number of columns (%d) in call update.",
+               data->ncolumn);
+        return;
+    }
+
+    if (data->nrow > (int)NCALL) {
+        error_msg("too many rows (%d) in call update", data->nrow);
+        return;
+    }
+
+    if (data->nrow == 0)
+        reset_calls();
+    else {
+        c = calls;
+
+        for (i = 0; i < data->nrow; i++) {
+            mrp_free((char *)c->state);
+            mrp_free((char *)c->modem);
+
+            v        = data->rows[i];
+            c->id    = v[0].s32;
+            c->state = mrp_strdup(v[1].str);
+            c->modem = mrp_strdup(v[2].str);
+
+            c += 1;
+        }
+    }
+
+    list_calls();
+}
+
+
+void update_imports(client_t *c, mrp_domctl_data_t *data, int ntable)
+{
+    int i;
+
+    for (i = 0; i < ntable; i++) {
+        if (c->zone) {
+            if (data[i].id == 0)
+                update_devices(data + i);
+            else
+                update_streams(data + i);
+        }
+        else {
+            if (data[i].id == 0)
+                update_zones(data + i);
+            else
+                update_calls(data + i);
+        }
+    }
+}
+
+
+
+static void terminal_prompt_erase(void)
+{
+    int n = strlen(DEFAULT_PROMPT);
+
+    printf("\r");
+    while (n-- > 0)
+        printf(" ");
+    printf("\r");
+}
+
+
+static void terminal_prompt_display(void)
+{
+    rl_callback_handler_remove();
+    rl_callback_handler_install(DEFAULT_PROMPT, terminal_input_cb);
+}
+
+
+static void show_help(void)
+{
+#define P info_msg
+
+    P("Available commands:");
+    P("  help                                  show this help");
+    P("  list                                  list all data");
+    P("  list {devices|streams|zones|calls}    list the requested data");
+    P("  plug <device>                         update <device> as plugged");
+    P("  unplug <device>                       update <device> as unplugged");
+    P("  play <stream>                         update <stream> as playing");
+    P("  stop <stream>                         update <stream> as stopped");
+    P("  call <call> <state>                   update state of <call>");
+    P("  zone <zone> [occupied,[active]]       update state of <zone>");
+
+#undef P
+}
+
+
+static void terminal_process_input(char *input)
+{
+    int len;
+
+    add_history(input);
+
+    if (input == NULL || !strcmp(input, "exit")) {
+        terminal_prompt_erase();
+        exit(0);
+    }
+    else if (!strcmp(input, "help")) {
+        show_help();
+    }
+    else if (!strcmp(input, "list")) {
+        list_devices();
+        list_streams();
+        list_zones();
+        list_calls();
+    }
+    else if (!strcmp(input, "list devices"))
+        list_devices();
+    else if (!strcmp(input, "list streams"))
+        list_streams();
+    else if (!strcmp(input, "list zones"))
+        list_zones();
+    else if (!strcmp(input, "list calls"))
+        list_calls();
+    else if (!strncmp(input, "plug "  , len=sizeof("plug ")   - 1) ||
+             !strncmp(input, "unplug ", len=sizeof("unplug ") - 1)) {
+        plug_device(client, input + len, *input == 'p');
+    }
+    else if (!strncmp(input, "play "  , len=sizeof("play ")   - 1) ||
+             !strncmp(input, "stop ", len=sizeof("stop ") - 1)) {
+        play_stream(client, input + len, *input == 'p');
+    }
+    else if (!strncmp(input, "call "  , len=sizeof("call ")   - 1)) {
+        set_call_state(client, input + len);
+    }
+    else if (!strncmp(input, "zone "  , len=sizeof("zone ")   - 1)) {
+        set_zone_state(client, input + len);
+    }
+}
+
+
+static void terminal_input_cb(char *input)
+{
+    terminal_process_input(input);
+    free(input);
+}
+
+
+static void terminal_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd,
+                        mrp_io_event_t events, void *user_data)
+{
+    MRP_UNUSED(w);
+    MRP_UNUSED(fd);
+    MRP_UNUSED(user_data);
+
+    if (events & MRP_IO_EVENT_IN)
+        rl_callback_read_char();
+
+    if (events & MRP_IO_EVENT_HUP)
+        mrp_mainloop_quit(ml, 0);
+}
+
+
+static void terminal_setup(client_t *c)
+{
+    mrp_io_event_t events;
+
+    c->fd  = fileno(stdin);
+    events = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP;
+    c->iow = mrp_add_io_watch(c->ml, c->fd, events, terminal_cb, c);
+
+    if (c->iow == NULL)
+        fatal_msg(1, "Failed to create terminal input I/O watch.");
+    else
+        terminal_prompt_display();
+}
+
+
+static void terminal_cleanup(client_t *c)
+{
+    mrp_del_io_watch(c->iow);
+    c->iow = NULL;
+
+    rl_callback_handler_remove();
+}
+
+
+static void fatal_msg(int error, const char *format, ...)
+{
+    va_list ap;
+
+    terminal_prompt_erase();
+
+    fprintf(stderr, "fatal error: ");
+    va_start(ap, format);
+    vfprintf(stderr, format, ap);
+    va_end(ap);
+    fprintf(stderr, "\n");
+    fflush(stderr);
+
+    exit(error);
+}
+
+
+static void error_msg(const char *format, ...)
+{
+    va_list ap;
+
+    terminal_prompt_erase();
+
+    fprintf(stderr, "error: ");
+    va_start(ap, format);
+    vfprintf(stderr, format, ap);
+    va_end(ap);
+    fprintf(stderr, "\n");
+    fflush(stderr);
+
+    terminal_prompt_display();
+}
+
+
+static void info_msg(const char *format, ...)
+{
+    va_list ap;
+
+    terminal_prompt_erase();
+
+    va_start(ap, format);
+    vfprintf(stdout, format, ap);
+    va_end(ap);
+    fprintf(stdout, "\n");
+    fflush(stdout);
+
+    terminal_prompt_display();
+}
+
+
+static void signal_handler(mrp_mainloop_t *ml, mrp_sighandler_t *h,
+                           int signum, void *user_data)
+{
+    MRP_UNUSED(h);
+    MRP_UNUSED(user_data);
+
+    switch (signum) {
+    case SIGINT:
+        info_msg("Got SIGINT, stopping...");
+        mrp_mainloop_quit(ml, 0);
+        break;
+    }
+}
+
+
+static void connect_notify(mrp_domctl_t *dc, int connected, int errcode,
+                           const char *errmsg, void *user_data)
+{
+    MRP_UNUSED(dc);
+    MRP_UNUSED(user_data);
+
+    if (connected) {
+        info_msg("Successfully registered to server.");
+        export_data(client);
+    }
+    else
+        error_msg("No connection to server (%d: %s).", errcode, errmsg);
+}
+
+
+static void data_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
+                        int ntable, void *user_data)
+{
+    client_t *client = (client_t *)user_data;
+
+    MRP_UNUSED(dc);
+
+    update_imports(client, tables, ntable);
+}
+
+
+static void export_notify(mrp_domctl_t *dc, int errcode, const char *errmsg,
+                          void *user_data)
+{
+    MRP_UNUSED(dc);
+    MRP_UNUSED(user_data);
+
+    if (errcode != 0) {
+        error_msg("Data set request failed (%d: %s).", errcode, errmsg);
+    }
+    else
+        info_msg("Sucessfully set data.");
+}
+
+
+static void export_data(client_t *c)
+{
+    mrp_domctl_data_t  *tables;
+    int                 ntable = 2;
+    mrp_domctl_value_t *values, *v;
+    int                 i;
+
+    tables = alloca(sizeof(*tables) * ntable);
+    values = alloca(sizeof(*values) * NVALUE);
+    v      = values;
+
+    if (!c->zone) {
+        tables[0].id      = 0;
+        tables[0].ncolumn = 4;
+        tables[0].nrow    = NDEVICE;
+        tables[0].rows    = alloca(sizeof(*tables[0].rows) * tables[0].nrow);
+
+        for (i = 0; i < (int)NDEVICE; i++) {
+            tables[0].rows[i] = v;
+            v[0].type = MRP_DOMCTL_STRING ; v[0].str = devices[i].name;
+            v[1].type = MRP_DOMCTL_STRING ; v[1].str = devices[i].type;
+            v[2].type = MRP_DOMCTL_INTEGER; v[2].s32 = devices[i].public;
+            v[3].type = MRP_DOMCTL_INTEGER; v[3].s32 = devices[i].available;
+            v += 4;
+        }
+
+        tables[1].id      = 1;
+        tables[1].ncolumn = 4;
+        tables[1].nrow    = NSTREAM;
+        tables[1].rows    = alloca(sizeof(*tables[1].rows) * tables[1].nrow);
+
+        for (i = 0; i < (int)NSTREAM; i++) {
+            tables[1].rows[i] = v;
+            v[0].type = MRP_DOMCTL_STRING  ; v[0].str = streams[i].name;
+            v[1].type = MRP_DOMCTL_STRING  ; v[1].str = streams[i].role;
+            v[2].type = MRP_DOMCTL_UNSIGNED; v[2].s32 = streams[i].owner;
+            v[3].type = MRP_DOMCTL_INTEGER ; v[3].u32 = streams[i].playing;
+            v += 4;
+        }
+    }
+    else {
+        tables[0].id      = 0;
+        tables[0].ncolumn = 3;
+        tables[0].nrow    = NZONE;
+        tables[0].rows    = alloca(sizeof(*tables[0].rows) * tables[0].nrow);
+
+        for (i = 0; i < (int)NZONE; i++) {
+            tables[0].rows[i] = v;
+            v[0].type = MRP_DOMCTL_STRING ; v[0].str = zones[i].name;
+            v[1].type = MRP_DOMCTL_INTEGER; v[1].s32 = zones[i].occupied;
+            v[2].type = MRP_DOMCTL_INTEGER; v[2].s32 = zones[i].active;
+            v += 3;
+        }
+
+        tables[1].id      = 1;
+        tables[1].ncolumn = 3;
+        tables[1].nrow    = NCALL;
+        tables[1].rows    = alloca(sizeof(*tables[0].rows) * tables[1].nrow);
+
+        for (i = 0; i < (int)NCALL; i++) {
+            tables[1].rows[i] = v;
+            v[0].type = MRP_DOMCTL_INTEGER; v[0].s32 = calls[i].id;
+            v[1].type = MRP_DOMCTL_STRING ; v[1].str = calls[i].state;
+            v[2].type = MRP_DOMCTL_STRING ; v[2].str = calls[i].modem;
+            v += 3;
+        }
+    }
+
+    if (!mrp_domctl_set_data(c->dc, tables, ntable, export_notify, c))
+        error_msg("Failed to send data set request to server.");
+}
+
+
+static void client_setup(client_t *c)
+{
+    mrp_mainloop_t *ml;
+    mrp_domctl_t   *dc;
+
+    ml = mrp_mainloop_create();
+
+    if (ml != NULL) {
+        if (!c->zone) {
+            exports = media_tables;
+            nexport = MRP_ARRAY_SIZE(media_tables);
+            imports = zone_watches;
+            nimport = MRP_ARRAY_SIZE(zone_watches);
+        }
+        else {
+            exports = zone_tables;
+            nexport = MRP_ARRAY_SIZE(zone_tables);
+            imports = media_watches;
+            nimport = MRP_ARRAY_SIZE(media_watches);
+        }
+
+        dc = mrp_domctl_create(c->zone ? "zone-ctrl" : "media-ctrl", ml,
+                               exports, nexport, imports, nimport,
+                               connect_notify, data_notify, c);
+
+        if (dc != NULL) {
+            c->ml = ml;
+            c->dc = dc;
+
+            mrp_add_sighandler(ml, SIGINT, signal_handler, c);
+
+            if (c->zone) {
+                zone_t *z;
+                call_t *call;
+
+                for (z = zones; z->name != NULL; z++) {
+                    z->name = mrp_strdup(z->name);
+                }
+
+                for (call = calls; call->id > 0; call++) {
+                    call->state = mrp_strdup(call->state);
+                    call->modem = mrp_strdup(call->modem);
+                }
+
+                reset_devices();
+                reset_streams();
+            }
+            else {
+                device_t *d;
+                stream_t *s;
+
+                for (d = devices; d->name != NULL; d++) {
+                    d->name = mrp_strdup(d->name);
+                    d->type = mrp_strdup(d->type);
+                }
+
+                for (s = streams; s->name != NULL; s++) {
+                    s->name = mrp_strdup(s->name);
+                    s->role = mrp_strdup(s->role);
+                }
+
+                reset_zones();
+                reset_calls();
+            }
+        }
+        else
+            fatal_msg(1, "Failed to create enforcement point.");
+    }
+    else
+        fatal_msg(1, "Failed to create mainloop.");
+}
+
+
+static void client_cleanup(client_t *c)
+{
+    mrp_mainloop_destroy(c->ml);
+    mrp_domctl_destroy(c->dc);
+
+    c->ml = NULL;
+    c->dc = NULL;
+}
+
+
+static void client_run(client_t *c)
+{
+    if (mrp_domctl_connect(c->dc, c->addrstr))
+        info_msg("Connected to server at %s.", c->addrstr);
+    else
+        error_msg("Failed to connect to server at %s.", c->addrstr);
+
+    mrp_mainloop_run(c->ml);
+}
+
+
+static void print_usage(const char *argv0, int exit_code, const char *fmt, ...)
+{
+    va_list ap;
+
+    if (fmt && *fmt) {
+        va_start(ap, fmt);
+        vprintf(fmt, ap);
+        va_end(ap);
+    }
+
+    printf("usage: %s [options]\n\n"
+           "The possible options are:\n"
+           "  -s, --server <address>     connect to murphy at given address\n"
+           "  -z, --zone                 run as zone controller\n"
+           "  -v, --verbose              run in verbose mode\n"
+           "  -h, --help                 show this help on usage\n",
+           argv0);
+
+    if (exit_code < 0)
+        return;
+    else
+        exit(exit_code);
+}
+
+
+static void client_set_defaults(client_t *c)
+{
+    mrp_clear(c);
+    c->addrstr = MRP_DEFAULT_DOMCTL_ADDRESS;
+    c->zone    = FALSE;
+    c->verbose = FALSE;
+}
+
+
+int parse_cmdline(client_t *c, int argc, char **argv)
+{
+#   define OPTIONS "vzhs:"
+    struct option options[] = {
+        { "server"    , required_argument, NULL, 's' },
+        { "zone"      , no_argument      , NULL, 'z' },
+        { "verbose"   , optional_argument, NULL, 'v' },
+        { "help"      , no_argument      , NULL, 'h' },
+        { NULL, 0, NULL, 0 }
+    };
+
+    int opt;
+
+    client_set_defaults(c);
+
+    while ((opt = getopt_long(argc, argv, OPTIONS, options, NULL)) != -1) {
+        switch (opt) {
+        case 'z':
+            c->zone = TRUE;
+            break;
+
+        case 'v':
+            c->verbose = TRUE;
+            break;
+
+        case 'a':
+            c->addrstr = optarg;
+            break;
+
+        case 'h':
+            print_usage(argv[0], -1, "");
+            exit(0);
+            break;
+
+        default:
+            print_usage(argv[0], EINVAL, "invalid option '%c'", opt);
+        }
+    }
+
+    return TRUE;
+}
+
+
+int main(int argc, char *argv[])
+{
+    client_t  c;
+
+    client_set_defaults(&c);
+    parse_cmdline(&c, argc, argv);
+
+    client_setup(&c);
+    terminal_setup(&c);
+
+    client = &c;
+    client_run(&c);
+
+    terminal_cleanup(&c);
+    client_cleanup(&c);
+
+    return 0;
+}