AM_CONDITIONAL(DISABLED_PLUGIN_SIGNALLING, [check_if_disabled signalling])
AM_CONDITIONAL(DISABLED_PLUGIN_DECISION, [check_if_disabled decision])
AM_CONDITIONAL(DISABLED_PLUGIN_RESOURCE_DBUS, [check_if_disabled resource-dbus])
+AM_CONDITIONAL(DISABLED_PLUGIN_DOMAIN_CONTROL,
+ [check_if_disabled domain-control])
AM_CONDITIONAL(BUILTIN_PLUGIN_TEST, [check_if_internal test])
AM_CONDITIONAL(BUILTIN_PLUGIN_DBUS, [check_if_internal dbus])
AM_CONDITIONAL(BUILTIN_PLUGIN_SIGNALLING, [check_if_internal signalling])
AM_CONDITIONAL(BUILTIN_PLUGIN_DECISION, [check_if_internal decision])
AM_CONDITIONAL(BUILTIN_PLUGIN_RESOURCE_DBUS, [check_if_internal resource-dbus])
+AM_CONDITIONAL(BUILTIN_PLUGIN_DOMAIN_CONTROL,
+ [check_if_internal domain-control])
# Check for Check (unit test framework).
PKG_CHECK_MODULES(CHECK,
-lreadline
endif
+# domain control plugin
+DOMAIN_CONTROL_PLUGIN_SOURCES = plugins/domain-control/plugin-domain-control.c \
+ plugins/domain-control/domain-control.c \
+ plugins/domain-control/proxy.c \
+ plugins/domain-control/table.c \
+ plugins/domain-control/message.c \
+ plugins/domain-control/notify.c
+
+DOMAIN_CONTROL_PLUGIN_CFLAGS =
+DOMAIN_CONTROL_PLUGIN_LIBS =
+
+if !DISABLED_PLUGIN_DOMAIN_CONTROL
+if BUILTIN_PLUGIN_DOMAIN_CONTROL
+LINKEDIN_PLUGINS += libmurphy-plugin-domain-control.la
+lib_LTLIBRARIES += libmurphy-plugin-domain-control.la
+DOMAIN_CONTROL_PLUGIN_LOADER = linkedin-domain-control-loader.c
+DOMAIN_CONTROL_PLUGIN_CFLAGS += $(BUILTIN_CFLAGS)
+
+
+libmurphy_plugin_domain_control_ladir = \
+ $(includedir)/murphy/domain-control
+
+libmurphy_plugin_domain_control_la_SOURCES = \
+ $(DOMAIN_CONTROL_PLUGIN_SOURCES) \
+ $(DOMAIN_CONTROL_PLUGIN_LOADER)
+
+libmurphy_plugin_domain_control_la_CFLAGS = \
+ $(DOMAIN_CONTROL_PLUGIN_CFLAGS) \
+ $(AM_CFLAGS)
+
+libmurphy_plugin_domain_control_la_LDFLAGS = \
+ -Wl,-version-script=linker-script.domain-control \
+ -version-info @MURPHY_VERSION_INFO@
+
+libmurphy_plugin_domain_control_la_LIBADD = \
+ libmurphy-core.la \
+ libmurphy-common.la \
+ murphy-db/mql/libmql.la \
+ murphy-db/mqi/libmqi.la \
+ murphy-db/mdb/libmdb.la
+
+libmurphy_plugin_domain_control_la_DEPENDENCIES = \
+ linker-script.domain-control \
+ libmurphy-core.la \
+ libmurphy-common.la \
+ murphy-db/mql/libmql.la \
+ murphy-db/mqi/libmqi.la \
+ murphy-db/mdb/libmdb.la
+
+# linkedin domain control plugin linker script generation
+linker-script.domain-control: $(DOMAIN_CONTROL_PLUGIN_LOADER:%.c=%.h)
+ $(QUIET_GEN)$(top_builddir)/build-aux/gen-linker-script -q -o $@ $^
+
+clean-linker-script::
+ -rm -f linker-script.domain-control
+else
+plugin_domain_control_la_SOURCES = $(DOMAIN_CONTROL_PLUGIN_SOURCES)
+plugin_domain_control_la_CFLAGS = $(DOMAIN_CONTROL_PLUGIN_CFLAGS) \
+ $(MURPHY_CFLAGS) $(AM_CFLAGS)
+plugin_domain_control_la_LDFLAGS = -module -avoid-version
+plugin_domain_control_la_LIBADD = $(DOMAIN_CONTROL_PLUGIN_LIBS)
+plugin_LTLIBRARIES += plugin-domain-control.la
+endif
+
+# domain controller client library
+lib_LTLIBRARIES += libmurphy-domain-controller.la
+libmurphy_domain_controller_la_SOURCES = plugins/domain-control/client.c \
+ plugins/domain-control/table-common.c\
+ plugins/domain-control/message.c
+libmurphy_domain_controller_la_CFLAGS =
+libmurphy_domain_controller_la_LIBADD = libmurphy-common.la \
+ murphy-db/mql/libmql.la \
+ murphy-db/mqi/libmqi.la \
+ murphy-db/mdb/libmdb.la
+
+# test domain controller
+bin_PROGRAMS += test-domain-controller
+
+test_domain_controller_SOURCES = plugins/domain-control/test-client.c
+test_domain_controller_CFLAGS = $(AM_CFLAGS)
+test_domain_controller_LDADD = libmurphy-domain-controller.la \
+ libmurphy-common.la \
+ -lreadline
+endif
+
###################################
# murphy daemon
#
end
# load the native resource plugin if it exists
-if plugin-exists resource-native
- load-plugin resource-native
-else
- info "Could not find resource-native plugin"
-end
+#if plugin-exists resource-native
+# load-plugin resource-native
+#else
+# info "Could not find resource-native plugin"
+#end
# load the enforcement point interface prototype plugin if it exists
-if plugin-exists decision-proto
- load-plugin decision-proto
-else
- info "There is no decision-proto plugin available..."
-end
+#if plugin-exists decision-proto
+# load-plugin decision-proto
+#else
+# info "There is no decision-proto plugin available..."
+#end
-if plugin-exists resource-dbus
- load-plugin resource-dbus
+#if plugin-exists resource-dbus
+# try-load-plugin resource-dbus
+#end
+
+if plugin-exists domain-control
+ load-plugin domain-control
+else
+ info "No domain-control plugin found..."
end
#set resolver-ruleset '/u/src/work/murphy/src/resolver/test-input'
--- /dev/null
+ifneq ($(strip $(MAKECMDGOALS)),)
+%:
+ $(MAKE) -C .. $(MAKECMDGOALS)
+else
+all:
+ $(MAKE) -C .. all
+endif
--- /dev/null
+#include <errno.h>
+#include <alloca.h>
+
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+#include <murphy/common/mainloop.h>
+#include <murphy/common/transport.h>
+
+#include "domain-control-types.h"
+#include "table.h"
+#include "message.h"
+#include "client.h"
+
+
+/*
+ * mark an enforcement point busy (typically while executing a callback)
+ */
+
+#define DOMCTL_MARK_BUSY(dc, ...) do { \
+ (dc)->busy++; \
+ __VA_ARGS__ \
+ (dc)->busy--; \
+ check_destroyed(dc); \
+ } while (0)
+
+
+/*
+ * a pending request
+ */
+
+typedef struct {
+ mrp_list_hook_t hook; /* hook to pending request queue */
+ uint32_t seqno; /* sequence number/request id */
+ mrp_domctl_status_cb_t cb; /* callback to call upon completion */
+ void *user_data; /* opaque callback data */
+} pending_request_t;
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
+static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
+ mrp_sockaddr_t *addr, socklen_t addrlen,
+ void *user_data);
+static void closed_cb(mrp_transport_t *t, int error, void *user_data);
+
+
+static int queue_pending(mrp_domctl_t *dc, uint32_t seq,
+ mrp_domctl_status_cb_t cb, void *user_data);
+static int notify_pending(mrp_domctl_t *dc, uint32_t seq, int error,
+ const char *msg);
+static void purge_pending(mrp_domctl_t *dc);
+
+
+
+
+mrp_domctl_t *mrp_domctl_create(const char *name, mrp_mainloop_t *ml,
+ mrp_domctl_table_t *tables, int ntable,
+ mrp_domctl_watch_t *watches, int nwatch,
+ mrp_domctl_connect_cb_t connect_cb,
+ mrp_domctl_watch_cb_t watch_cb, void *user_data)
+{
+ mrp_domctl_t *dc;
+ mrp_domctl_table_t *st, *dt;
+ mrp_domctl_watch_t *sw, *dw;
+ int i;
+
+ dc = mrp_allocz(sizeof(*dc));
+
+ if (dc != NULL) {
+ mrp_list_init(&dc->pending);
+ dc->ml = ml;
+
+ dc->name = mrp_strdup(name);
+ dc->tables = mrp_allocz_array(typeof(*dc->tables) , ntable);
+ dc->watches = mrp_allocz_array(typeof(*dc->watches), nwatch);
+
+ if (dc->name != NULL && dc->tables != NULL && dc->watches != NULL) {
+ for (i = 0; i < ntable; i++) {
+ st = tables + i;
+ dt = dc->tables + i;
+
+ dt->table = mrp_strdup(st->table);
+ dt->mql_columns = mrp_strdup(st->mql_columns);
+ dt->mql_index = mrp_strdup(st->mql_index ? st->mql_index:"");
+
+ if (!dt->table || !dt->mql_columns || !dt->mql_index)
+ break;
+
+ dc->ntable++;
+ }
+
+ for (i = 0; i < nwatch; i++) {
+ sw = watches + i;
+ dw = dc->watches + i;
+
+ dw->table = mrp_strdup(sw->table);
+ dw->mql_columns = mrp_strdup(sw->mql_columns);
+ dw->mql_where = mrp_strdup(sw->mql_where ? sw->mql_where:"");
+ dw->max_rows = sw->max_rows;
+
+ if (!dw->table || !dw->mql_columns || !dw->mql_where)
+ break;
+
+ dc->nwatch++;
+ }
+
+ dc->connect_cb = connect_cb;
+ dc->watch_cb = watch_cb;
+ dc->user_data = user_data;
+ dc->seqno = 1;
+
+ return dc;
+ }
+
+ mrp_domctl_destroy(dc);
+ }
+
+ return NULL;
+}
+
+
+static void destroy_domctl(mrp_domctl_t *dc)
+{
+ int i;
+
+ mrp_free(dc->name);
+
+ for (i = 0; i < dc->ntable; i++) {
+ mrp_free((char *)dc->tables[i].table);
+ mrp_free((char *)dc->tables[i].mql_columns);
+ mrp_free((char *)dc->tables[i].mql_index);
+ }
+ mrp_free(dc->tables);
+
+ for (i = 0; i < dc->nwatch; i++) {
+ mrp_free((char *)dc->watches[i].table);
+ mrp_free((char *)dc->watches[i].mql_columns);
+ mrp_free((char *)dc->watches[i].mql_where);
+ }
+ mrp_free(dc->watches);
+
+ mrp_free(dc);
+}
+
+
+static inline void check_destroyed(mrp_domctl_t *dc)
+{
+ if (dc->destroyed && dc->busy <= 0) {
+ destroy_domctl(dc);
+ }
+}
+
+
+void mrp_domctl_destroy(mrp_domctl_t *dc)
+{
+ if (dc != NULL) {
+ mrp_domctl_disconnect(dc);
+
+ if (dc->busy <= 0)
+ destroy_domctl(dc);
+ else
+ dc->destroyed = TRUE;
+ }
+}
+
+
+static void notify_disconnect(mrp_domctl_t *dc, uint32_t errcode,
+ const char *errmsg)
+{
+ DOMCTL_MARK_BUSY(dc, {
+ dc->connected = FALSE;
+ dc->connect_cb(dc, FALSE, errcode, errmsg, dc->user_data);
+ });
+}
+
+
+static void notify_connect(mrp_domctl_t *dc)
+{
+ DOMCTL_MARK_BUSY(dc, {
+ dc->connected = TRUE;
+ dc->connect_cb(dc, TRUE, 0, NULL, dc->user_data);
+ });
+}
+
+
+static int domctl_register(mrp_domctl_t *dc)
+{
+ mrp_msg_t *msg;
+ int success;
+
+ msg = create_register_message(dc);
+
+ if (msg != NULL) {
+ success = mrp_transport_send(dc->t, msg);
+ mrp_msg_unref(msg);
+ }
+ else
+ success = FALSE;
+
+ return success;
+}
+
+
+int mrp_domctl_connect(mrp_domctl_t *dc, const char *address)
+{
+ static mrp_transport_evt_t evt = {
+ .closed = closed_cb,
+ .recvmsg = recv_cb,
+ .recvmsgfrom = recvfrom_cb,
+ };
+
+ mrp_sockaddr_t addr;
+ socklen_t addrlen;
+ const char *type;
+
+ if (dc == NULL)
+ return FALSE;
+
+ addrlen = mrp_transport_resolve(NULL, address, &addr, sizeof(addr), &type);
+
+ if (addrlen > 0) {
+ dc->t = mrp_transport_create(dc->ml, type, &evt, dc, 0);
+
+ if (dc->t != NULL) {
+ if (mrp_transport_connect(dc->t, &addr, addrlen))
+ if (domctl_register(dc))
+ return TRUE;
+
+ mrp_transport_destroy(dc->t);
+ dc->t = NULL;
+ }
+ }
+
+ return FALSE;
+}
+
+
+void mrp_domctl_disconnect(mrp_domctl_t *dc)
+{
+ if (dc->t != NULL) {
+ mrp_transport_destroy(dc->t);
+ dc->t = NULL;
+ dc->connected = FALSE;
+ }
+}
+
+
+int mrp_domctl_set_data(mrp_domctl_t *dc, mrp_domctl_data_t *tables, int ntable,
+ mrp_domctl_status_cb_t cb, void *user_data)
+{
+ mrp_msg_t *msg;
+ uint32_t seq = dc->seqno++;
+ int success, i;
+
+ if (!dc->connected)
+ return FALSE;
+
+ for (i = 0; i < ntable; i++) {
+ if (tables[i].id < 0 || tables[i].id >= dc->ntable)
+ return FALSE;
+ }
+
+ msg = create_set_message(seq, tables, ntable);
+
+ if (msg != NULL) {
+ /*
+ mrp_log_info("set data message message:");
+ mrp_msg_dump(msg, stdout);
+ */
+
+ success = mrp_transport_send(dc->t, msg);
+ mrp_msg_unref(msg);
+
+ if (success)
+ queue_pending(dc, seq, cb, user_data);
+
+ return success;
+ }
+ else
+ return FALSE;
+}
+
+
+static void process_ack(mrp_domctl_t *dc, uint32_t seq)
+{
+ if (seq != 0)
+ notify_pending(dc, seq, 0, NULL);
+ else
+ notify_connect(dc);
+}
+
+
+static void process_nak(mrp_domctl_t *dc, uint32_t seq, int32_t err,
+ const char *msg)
+{
+ if (seq != 0)
+ notify_pending(dc, seq, err, msg);
+ else
+ notify_disconnect(dc, err, msg);
+}
+
+
+static void process_notify(mrp_domctl_t *dc, mrp_msg_t *msg, uint32_t seq)
+{
+ mrp_domctl_data_t *data, *d;
+ mrp_domctl_value_t *values, *v;
+ void *it;
+ uint16_t ntable, ntotal, nrow, ncol;
+ uint16_t tblid;
+ int t, r, c;
+ uint16_t type;
+ mrp_msg_value_t value;
+
+ if (!mrp_msg_get(msg,
+ MRP_PEPMSG_UINT16(NCHANGE, &ntable),
+ MRP_PEPMSG_UINT16(NTOTAL , &ntotal),
+ MRP_MSG_END))
+ return;
+
+ data = alloca(sizeof(*data) * ntable);
+ values = alloca(sizeof(*values) * ntotal);
+
+ it = NULL;
+ d = data;
+ v = values;
+
+ for (t = 0; t < ntable; t++) {
+ if (!mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_UINT16(TBLID, &tblid),
+ MRP_PEPMSG_UINT16(NROW , &nrow ),
+ MRP_PEPMSG_UINT16(NCOL , &ncol ),
+ MRP_MSG_END))
+ return;
+
+ if (tblid >= dc->nwatch)
+ return;
+
+ d->id = tblid;
+ d->ncolumn = ncol;
+ d->nrow = nrow;
+ d->rows = alloca(sizeof(*d->rows) * nrow);
+
+ for (r = 0; r < nrow; r++) {
+ d->rows[r] = v;
+
+ for (c = 0; c < ncol; c++) {
+ if (!mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_ANY(DATA, &type, &value),
+ MRP_MSG_END))
+ return;
+
+ switch (type) {
+ case MRP_MSG_FIELD_STRING:
+ v->type = MRP_DOMCTL_STRING;
+ v->str = value.str;
+ break;
+ case MRP_MSG_FIELD_SINT32:
+ v->type = MRP_DOMCTL_INTEGER;
+ v->s32 = value.s32;
+ break;
+ case MRP_MSG_FIELD_UINT32:
+ v->type = MRP_DOMCTL_UNSIGNED;
+ v->u32 = value.u32;
+ break;
+ case MRP_MSG_FIELD_DOUBLE:
+ v->type = MRP_DOMCTL_DOUBLE;
+ v->dbl = value.dbl;
+ break;
+ default:
+ return;
+ }
+
+ v++;
+ }
+ }
+
+ d++;
+ }
+
+ dc->watch_cb(dc, data, ntable, dc->user_data);
+}
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+ mrp_domctl_t *dc = (mrp_domctl_t *)user_data;
+ uint16_t type, nchange, ntotal;
+ uint32_t seq;
+ int ntable, ncolumn;
+ int32_t error;
+ const char *errmsg;
+
+ MRP_UNUSED(t);
+
+ /*
+ mrp_log_info("Received message:");
+ mrp_msg_dump(msg, stdout);
+ */
+
+ if (!mrp_msg_get(msg,
+ MRP_PEPMSG_UINT16(MSGTYPE, &type),
+ MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
+ MRP_MSG_END)) {
+ mrp_domctl_disconnect(dc);
+ notify_disconnect(dc, EINVAL, "malformed message from client");
+ return;
+ }
+
+ switch (type) {
+ case MRP_PEPMSG_ACK:
+ process_ack(dc, seq);
+ break;
+
+ case MRP_PEPMSG_NAK:
+ error = EINVAL;
+ errmsg = "request failed, unknown error";
+
+ mrp_msg_get(msg,
+ MRP_PEPMSG_SINT32(ERRCODE, &error),
+ MRP_PEPMSG_STRING(ERRMSG , &errmsg),
+ MRP_MSG_END);
+
+ process_nak(dc, seq, error, errmsg);
+ break;
+
+ case MRP_PEPMSG_NOTIFY:
+ if (mrp_msg_get(msg,
+ MRP_PEPMSG_UINT16(NCHANGE, &nchange),
+ MRP_PEPMSG_UINT16(NTOTAL , &ntotal),
+ MRP_MSG_END)) {
+ ntable = nchange;
+ ncolumn = ntotal;
+
+ process_notify(dc, msg, seq);
+ }
+ break;
+
+ default:
+ break;
+ }
+
+}
+
+
+static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
+ mrp_sockaddr_t *addr, socklen_t addrlen,
+ void *user_data)
+{
+ MRP_UNUSED(addr);
+ MRP_UNUSED(addrlen);
+
+ /* XXX TODO:
+ * This should neither be called nor be necessary to specify.
+ * However, currently the transport layer mandates having to
+ * give both recv and recvfrom event callbacks if no connection
+ * event callback is given. However this is not correct because
+ * on a client side one wants to be able to create a connection-
+ * oriented transport without both connection and recvfrom event
+ * callbacks. This needs to be fixed in transport by moving the
+ * appropriate callback checks lower in the stack to the actual
+ * transport backends.
+ */
+
+ mrp_log_error("Whoa... recvfrom called for a connected transport.");
+ exit(1);
+}
+
+
+static void closed_cb(mrp_transport_t *t, int error, void *user_data)
+{
+ mrp_domctl_t *dc = (mrp_domctl_t *)user_data;
+
+ MRP_UNUSED(t);
+ MRP_UNUSED(dc);
+
+ if (error)
+ notify_disconnect(dc, error, strerror(error));
+ else
+ notify_disconnect(dc, ECONNRESET, "server has closed the connection");
+}
+
+
+static int queue_pending(mrp_domctl_t *dc, uint32_t seq,
+ mrp_domctl_status_cb_t cb, void *user_data)
+{
+ pending_request_t *pending;
+
+ pending = mrp_allocz(sizeof(*pending));
+
+ if (pending != NULL) {
+ mrp_list_init(&pending->hook);
+
+ pending->seqno = seq;
+ pending->cb = cb;
+ pending->user_data = user_data;
+
+ mrp_list_append(&dc->pending, &pending->hook);
+
+ return TRUE;
+ }
+ else
+ return FALSE;
+}
+
+
+static int notify_pending(mrp_domctl_t *dc, uint32_t seq, int error,
+ const char *msg)
+{
+ mrp_list_hook_t *p, *n;
+ pending_request_t *pending;
+
+ mrp_list_foreach(&dc->pending, p, n) {
+ pending = mrp_list_entry(p, typeof(*pending), hook);
+
+ if (pending->seqno == seq) {
+ DOMCTL_MARK_BUSY(dc, {
+ pending->cb(dc, error, msg, pending->user_data);
+ mrp_list_delete(&pending->hook);
+ mrp_free(pending);
+ });
+
+ return TRUE;
+ }
+ }
+
+ return FALSE;
+}
+
+
+static void purge_pending(mrp_domctl_t *dc)
+{
+ mrp_list_hook_t *p, *n;
+ pending_request_t *pending;
+
+ mrp_list_foreach(&dc->pending, p, n) {
+ pending = mrp_list_entry(p, typeof(*pending), hook);
+
+ mrp_list_delete(&pending->hook);
+ mrp_free(pending);
+ }
+}
--- /dev/null
+#ifndef __MURPHY_DOMAIN_CONTROL_CLIENT_H__
+#define __MURPHY_DOMAIN_CONTROL_CLIENT_H__
+
+#include <murphy-db/mqi.h>
+#include <murphy/common/mainloop.h>
+
+#define MRP_DEFAULT_DOMCTL_ADDRESS "unxs:@murphy-domctrl"
+
+
+/*
+ * a table owned by a domain controller
+ */
+
+typedef struct {
+ const char *table; /* table name */
+ const char *mql_columns; /* column definition scriptlet */
+ const char *mql_index; /* index column list */
+} mrp_domctl_table_t;
+
+#define MRP_DOMCTL_TABLE(_table, _columns, _index) \
+ { .table = _table, .mql_columns = _columns, .mql_index = _index }
+
+
+/*
+ * a table tracked by a domain controller
+ */
+
+typedef struct {
+ const char *table; /* table name */
+ const char *mql_columns; /* column list for select */
+ const char *mql_where; /* where clause for select */
+ int max_rows; /* max number of rows to select */
+} mrp_domctl_watch_t;
+
+#define MRP_DOMCTL_WATCH(_table, _columns, _where, _max_rows) { \
+ .table = _table , \
+ .mql_columns = _columns ? _columns : "", \
+ .mql_where = _where ? _where : "", \
+ .max_rows = _max_rows , \
+ }
+
+
+/*
+ * table column types and values
+ */
+
+typedef enum {
+ MRP_DOMCTL_STRING = mqi_varchar,
+ MRP_DOMCTL_INTEGER = mqi_integer,
+ MRP_DOMCTL_UNSIGNED = mqi_unsignd,
+ MRP_DOMCTL_DOUBLE = mqi_floating
+} mrp_domctl_type_t;
+
+typedef struct {
+ mrp_domctl_type_t type; /* data type */
+ union {
+ const char *str; /* MRP_DOMCTL_STRING */
+ uint32_t u32; /* MRP_DOMCTL_UNSIGNED */
+ int32_t s32; /* MRP_DOMCTL_INTEGER */
+ double dbl; /* MRP_DOMCTL_DOUBLE */
+ };
+} mrp_domctl_value_t;
+
+
+/*
+ * table data
+ */
+
+typedef struct {
+ int id; /* table id */
+ mqi_column_def_t *coldefs; /* column definitions */
+ int ncolumn; /* columns per row */
+ mrp_domctl_value_t **rows; /* row data */
+ int nrow; /* number of rows */
+} mrp_domctl_data_t;
+
+
+
+/** Opaque policy domain controller type. */
+typedef struct mrp_domctl_s mrp_domctl_t;
+
+/** Callback type for connection state notifications. */
+typedef void (*mrp_domctl_connect_cb_t)(mrp_domctl_t *dc, int connection,
+ int errcode, const char *errmsg,
+ void *user_data);
+
+/** Callback type for request status notifications. */
+typedef void (*mrp_domctl_status_cb_t)(mrp_domctl_t *dc, int errcode,
+ const char *errmsg, void *user_data);
+
+/** Callback type for data change notifications. */
+typedef void (*mrp_domctl_watch_cb_t)(mrp_domctl_t *dc,
+ mrp_domctl_data_t *tables, int ntable,
+ void *user_data);
+
+/** Create a new policy domain controller. */
+mrp_domctl_t *mrp_domctl_create(const char *name, mrp_mainloop_t *ml,
+ mrp_domctl_table_t *tables, int ntable,
+ mrp_domctl_watch_t *watches, int nwatch,
+ mrp_domctl_connect_cb_t connect_cb,
+ mrp_domctl_watch_cb_t watch_cb,
+ void *user_data);
+
+/** Destroy the given policy domain controller. */
+void mrp_domctl_destroy(mrp_domctl_t *dc);
+
+/** Connect and register the given controller to the server. */
+int mrp_domctl_connect(mrp_domctl_t *dc, const char *address);
+
+/** Close the connection to the server. */
+void mrp_domctl_disconnect(mrp_domctl_t *dc);
+
+/** Set the content of the given tables to the provided data. */
+int mrp_domctl_set_data(mrp_domctl_t *dc, mrp_domctl_data_t *tables, int ntable,
+ mrp_domctl_status_cb_t status_cb, void *user_data);
+
+#endif /* __MURPHY_DOMAIN_CONTROL_CLIENT_H__ */
--- /dev/null
+#ifndef __MURPHY_DOMAIN_CONTROL_TYPES_H__
+#define __MURPHY_DOMAIN_CONTROL_TYPES_H__
+
+#include <murphy/common/list.h>
+#include <murphy/common/mainloop.h>
+#include <murphy/common/transport.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/core/context.h>
+
+#include "client.h"
+
+typedef struct pep_proxy_s pep_proxy_t;
+typedef struct pep_table_s pep_table_t;
+typedef struct pep_watch_s pep_watch_t;
+typedef struct pdp_s pdp_t;
+
+
+/*
+ * a domain controller (on the client side)
+ */
+
+struct mrp_domctl_s {
+ char *name; /* enforcment point name */
+ mrp_mainloop_t *ml; /* main loop */
+ mrp_transport_t *t; /* transport towards murphy */
+ int connected; /* transport is up */
+ mrp_domctl_table_t *tables; /* owned tables */
+ int ntable; /* number of owned tables */
+ mrp_domctl_watch_t *watches; /* watched tables */
+ int nwatch; /* number of watched tables */
+ mrp_domctl_connect_cb_t connect_cb; /* connection state change callback */
+ mrp_domctl_watch_cb_t watch_cb; /* watched table change callback */
+ void *user_data; /* opqaue user data for callbacks */
+ int busy; /* non-zero if a callback is active */
+ int destroyed:1;/* non-zero if destroy pending */
+ uint32_t seqno; /* request sequence number */
+ mrp_list_hook_t pending; /* queue of outstanding requests */
+};
+
+
+/*
+ * a table associated with or tracked by an enforcement point
+ */
+
+struct pep_table_s {
+ char *name; /* table name */
+ char *mql_columns; /* column definition clause */
+ char *mql_index; /* index column list */
+ mrp_list_hook_t hook; /* to list of tables */
+ mqi_handle_t h; /* table handle */
+ mqi_column_def_t *columns; /* column definitions */
+ mqi_column_desc_t *coldesc; /* column descriptors */
+ int ncolumn; /* number of columns */
+ int idx_col; /* column index of index column */
+ mrp_list_hook_t watches; /* watches for this table */
+ int notify_all : 1; /* notify all watches */
+};
+
+
+/*
+ * a table watch
+ */
+
+struct pep_watch_s {
+ pep_table_t *table; /* table being watched */
+ char *mql_columns; /* column list to select */
+ char *mql_where; /* where clause for select */
+ int max_rows; /* max number of rows to select */
+ pep_proxy_t *proxy; /* enforcement point */
+ int id; /* table id within proxy */
+ uint32_t stamp; /* last notified update stamp */
+ mrp_list_hook_t tbl_hook; /* hook to table watch list */
+ mrp_list_hook_t pep_hook; /* hook to proxy watch list */
+};
+
+
+/*
+ * a policy enforcement point (on the server side)
+ */
+
+struct pep_proxy_s {
+ char *name; /* enforcement point name */
+ pdp_t *pdp; /* domain controller context */
+ mrp_transport_t *t; /* associated transport */
+ mrp_list_hook_t hook; /* to list of all enforcement points */
+ pep_table_t *tables; /* tables owned by this */
+ int ntable; /* number of tables */
+ mrp_list_hook_t watches; /* tables watched by this */
+ int notify_update; /* whether needs notification */
+ mrp_msg_t *notify_msg; /* notification being built */
+ int notify_ntable; /* number of changed tables */
+ int notify_ncolumn; /* total columns in notification */
+ int notify_fail : 1; /* notification failure */
+ int notify_all : 1; /* notify all watches */
+};
+
+
+/*
+ * policy domain controller context
+ */
+
+struct pdp_s {
+ mrp_context_t *ctx; /* murphy context */
+ const char *address; /* external transport address */
+ mrp_transport_t *ext; /* external transport */
+ mrp_list_hook_t proxies; /* list of enforcement points */
+ mrp_list_hook_t tables; /* list of tables we track */
+ mrp_htbl_t *watched; /* tracked tables by name */
+ mrp_deferred_t *notify; /* deferred notification */
+ int notify_scheduled; /* is notification scheduled? */
+};
+
+
+
+#endif /* __MURPHY_DOMAIN_CONTROL_TYPES_H__ */
--- /dev/null
+#include <errno.h>
+
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+
+#include "proxy.h"
+#include "table.h"
+#include "message.h"
+#include "notify.h"
+#include "domain-control.h"
+
+static int create_transports(pdp_t *pdp);
+static void destroy_transports(pdp_t *pdp);
+
+pdp_t *create_domain_control(mrp_context_t *ctx, const char *address)
+{
+ pdp_t *pdp;
+
+ pdp = mrp_allocz(sizeof(*pdp));
+
+ if (pdp != NULL) {
+ pdp->ctx = ctx;
+ pdp->address = address;
+
+ if (init_proxies(pdp) && init_tables(pdp) && create_transports(pdp))
+ return pdp;
+ else
+ destroy_domain_control(pdp);
+ }
+
+ return NULL;
+}
+
+
+void destroy_domain_control(pdp_t *pdp)
+{
+ if (pdp != NULL) {
+ destroy_proxies(pdp);
+ destroy_tables(pdp);
+ destroy_transports(pdp);
+
+ mrp_free(pdp);
+ }
+}
+
+
+static void notify_cb(mrp_mainloop_t *ml, mrp_deferred_t *d, void *user_data)
+{
+ pdp_t *pdp = (pdp_t *)user_data;
+
+ MRP_UNUSED(ml);
+
+ mrp_disable_deferred(d);
+ pdp->notify_scheduled = FALSE;
+ notify_table_changes(pdp);
+}
+
+
+void schedule_notification(pdp_t *pdp)
+{
+
+ if (pdp->notify == NULL)
+ pdp->notify = mrp_add_deferred(pdp->ctx->ml, notify_cb, pdp);
+
+ if (!pdp->notify_scheduled) {
+ mrp_debug("scheduling client notification");
+ mrp_enable_deferred(pdp->notify);
+ }
+}
+
+
+static void send_ack_reply(mrp_transport_t *t, uint32_t seq)
+{
+ mrp_msg_t *msg;
+
+ msg = create_ack_message(seq);
+
+ if (msg != NULL) {
+ mrp_transport_send(t, msg);
+ mrp_msg_unref(msg);
+ }
+}
+
+
+static void send_nak_reply(mrp_transport_t *t, uint32_t seq, int error,
+ const char *errmsg)
+{
+ mrp_msg_t *msg;
+
+ msg = create_nak_message(seq, error, errmsg);
+
+ if (msg != NULL) {
+ mrp_transport_send(t, msg);
+ mrp_msg_unref(msg);
+ }
+}
+
+
+static int process_register_request(pep_proxy_t *proxy, mrp_msg_t *req,
+ uint32_t seq)
+{
+ mrp_transport_t *t = proxy->t;
+ char *name;
+ mrp_domctl_table_t *tables;
+ mrp_domctl_watch_t *watches;
+ uint16_t utable, uwatch;
+ int ntable, nwatch;
+ int error;
+ const char *errmsg;
+
+ if (mrp_msg_get(req,
+ MRP_PEPMSG_STRING(NAME , &name ),
+ MRP_PEPMSG_UINT16(NTABLE , &utable),
+ MRP_PEPMSG_UINT16(NWATCH , &uwatch),
+ MRP_MSG_END)) {
+ ntable = utable;
+ nwatch = uwatch;
+ tables = alloca(ntable * sizeof(*tables));
+ watches = alloca(nwatch * sizeof(*watches));
+
+ if (decode_register_message(req, tables, ntable, watches, nwatch)) {
+ if (register_proxy(proxy, name, tables, ntable, watches, nwatch,
+ &error, &errmsg)) {
+ send_ack_reply(t, seq);
+ proxy->notify_all = TRUE;
+ schedule_notification(proxy->pdp);
+
+ return TRUE;
+ }
+ }
+ else
+ goto malformed;
+ }
+ else {
+ malformed:
+ error = EINVAL;
+ errmsg = "malformed register message";
+ }
+
+ send_nak_reply(t, seq, error, errmsg);
+
+ return FALSE;
+}
+
+
+static void process_unregister_request(pep_proxy_t *proxy, uint32_t seq)
+{
+ send_ack_reply(proxy->t, seq);
+ unregister_proxy(proxy);
+}
+
+
+static void process_set_request(pep_proxy_t *proxy, mrp_msg_t *msg,
+ uint32_t seq)
+{
+ mrp_domctl_data_t *data, *d;
+ mrp_domctl_value_t *values, *v;
+ void *it;
+ uint16_t ntable, ntotal, nrow, ncol;
+ uint16_t tblid;
+ int t, r, c;
+ uint16_t type;
+ mrp_msg_value_t value;
+ int error;
+ const char *errmsg;
+
+ if (!mrp_msg_get(msg,
+ MRP_PEPMSG_UINT16(NCHANGE, &ntable),
+ MRP_PEPMSG_UINT16(NTOTAL , &ntotal),
+ MRP_MSG_END))
+ return;
+
+ data = alloca(sizeof(*data) * ntable);
+ values = alloca(sizeof(*values) * ntotal);
+
+ it = NULL;
+ d = data;
+ v = values;
+
+ for (t = 0; t < ntable; t++) {
+ if (!mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_UINT16(TBLID, &tblid),
+ MRP_PEPMSG_UINT16(NROW , &nrow ),
+ MRP_PEPMSG_UINT16(NCOL , &ncol ),
+ MRP_MSG_END))
+ goto reply_nak;
+
+ if (tblid >= proxy->ntable)
+ goto reply_nak;
+
+ d->id = tblid;
+ d->ncolumn = ncol;
+ d->nrow = nrow;
+ d->rows = alloca(sizeof(*d->rows) * nrow);
+
+ for (r = 0; r < nrow; r++) {
+ d->rows[r] = v;
+
+ for (c = 0; c < ncol; c++) {
+ if (!mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_ANY(DATA, &type, &value),
+ MRP_MSG_END))
+ goto reply_nak;
+
+ switch (type) {
+ case MRP_MSG_FIELD_STRING:
+ v->type = MRP_DOMCTL_STRING;
+ v->str = value.str;
+ break;
+ case MRP_MSG_FIELD_SINT32:
+ v->type = MRP_DOMCTL_INTEGER;
+ v->s32 = value.s32;
+ break;
+ case MRP_MSG_FIELD_UINT32:
+ v->type = MRP_DOMCTL_UNSIGNED;
+ v->u32 = value.u32;
+ break;
+ case MRP_MSG_FIELD_DOUBLE:
+ v->type = MRP_DOMCTL_DOUBLE;
+ v->dbl = value.dbl;
+ break;
+ default:
+ goto reply_nak;
+ }
+
+ v++;
+ }
+ }
+
+ d++;
+ }
+
+ if (set_proxy_tables(proxy, data, ntable, &error, &errmsg)) {
+ send_ack_reply(proxy->t, seq);
+ }
+ else {
+ reply_nak:
+ send_nak_reply(proxy->t, seq, error, errmsg);
+ }
+}
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+ pep_proxy_t *proxy = (pep_proxy_t *)user_data;
+ char *name = proxy && proxy->name ? proxy->name : "<unknown>";
+ uint16_t type;
+ uint32_t seq;
+
+ /*
+ mrp_log_info("Message from client %p:", proxy);
+ mrp_msg_dump(msg, stdout);
+ */
+
+ if (!mrp_msg_get(msg,
+ MRP_PEPMSG_UINT16(MSGTYPE, &type),
+ MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
+ MRP_MSG_END)) {
+ mrp_log_error("Malformed message from client %s.", name);
+ send_nak_reply(t, 0, EINVAL, "malformed message");
+ }
+ else {
+ switch (type) {
+ case MRP_PEPMSG_REGISTER:
+ if (!process_register_request(proxy, msg, seq))
+ destroy_proxy(proxy);
+ break;
+
+ case MRP_PEPMSG_UNREGISTER:
+ process_unregister_request(proxy, seq);
+ break;
+
+ case MRP_PEPMSG_SET:
+ process_set_request(proxy, msg, seq);
+ break;
+
+ default:
+ break;
+ }
+ }
+}
+
+
+static void connect_cb(mrp_transport_t *ext, void *user_data)
+{
+ pdp_t *pdp = (pdp_t *)user_data;
+ pep_proxy_t *proxy;
+ int flags;
+
+ proxy = create_proxy(pdp);
+
+ if (proxy != NULL) {
+ flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
+ proxy->t = mrp_transport_accept(ext, proxy, flags);
+
+ if (proxy->t != NULL)
+ mrp_log_info("Accepted new client connection.");
+ else {
+ mrp_log_error("Failed to accept new client connection.");
+ destroy_proxy(proxy);
+ }
+ }
+}
+
+
+static void closed_cb(mrp_transport_t *t, int error, void *user_data)
+{
+ pep_proxy_t *proxy = (pep_proxy_t *)user_data;
+ char *name = proxy && proxy->name ? proxy->name : "<unknown>";
+
+ MRP_UNUSED(t);
+
+ if (error)
+ mrp_log_error("Transport to client %s closed (%d: %s).",
+ name, error, strerror(error));
+ else
+ mrp_log_info("Transport to client %s closed.", name);
+
+ mrp_log_info("Destroying client %s.", name);
+ destroy_proxy(proxy);
+}
+
+
+static int create_ext_transport(pdp_t *pdp)
+{
+ static mrp_transport_evt_t evt = {
+ .closed = closed_cb,
+ .recvmsg = recv_cb,
+ .recvmsgfrom = NULL,
+ .connection = connect_cb,
+ };
+
+ mrp_transport_t *t;
+ mrp_sockaddr_t addr;
+ socklen_t addrlen;
+ int flags;
+ const char *type;
+
+ t = NULL;
+ addrlen = mrp_transport_resolve(NULL, pdp->address,
+ &addr, sizeof(addr), &type);
+
+ if (addrlen > 0) {
+ flags = MRP_TRANSPORT_REUSEADDR;
+ t = mrp_transport_create(pdp->ctx->ml, type, &evt, pdp, flags);
+
+ if (t != NULL) {
+ if (mrp_transport_bind(t, &addr, addrlen) &&
+ mrp_transport_listen(t, 4)) {
+ mrp_log_info("Listening on transport %s...", pdp->address);
+ pdp->ext = t;
+
+ return TRUE;
+ }
+ else
+ mrp_log_error("Failed to bind transport to %s.", pdp->address);
+ }
+ else
+ mrp_log_error("Failed to create transport for %s.", pdp->address);
+ }
+ else
+ mrp_log_error("Invalid transport address %s.", pdp->address);
+
+ return FALSE;
+}
+
+
+static void destroy_ext_transport(pdp_t *pdp)
+{
+ if (pdp != NULL) {
+ mrp_transport_destroy(pdp->ext);
+ pdp->ext = NULL;
+ }
+}
+
+
+static int create_transports(pdp_t *pdp)
+{
+ return create_ext_transport(pdp);
+}
+
+
+static void destroy_transports(pdp_t *pdp)
+{
+ destroy_ext_transport(pdp);
+}
--- /dev/null
+#ifndef __MURPHY_DOMAIN_CONTROL_H__
+#define __MURPHY_DOMAIN_CONTROL_H__
+
+#include "domain-control-types.h"
+
+pdp_t *create_domain_control(mrp_context_t *ctx, const char *address);
+void destroy_domain_control(pdp_t *pdp);
+
+void schedule_notification(pdp_t *pdp);
+
+#endif /* __MURPHY_DOMAIN_CONTROL_H__ */
--- /dev/null
+#include "message.h"
+
+static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col,
+ int ncolumn, mrp_domctl_value_t *data);
+
+mrp_msg_t *create_register_message(mrp_domctl_t *dc)
+{
+ mrp_msg_t *msg;
+ mrp_domctl_table_t *t;
+ mrp_domctl_watch_t *w;
+ int i;
+
+ msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_REGISTER),
+ MRP_PEPMSG_UINT32(MSGSEQ , 0),
+ MRP_PEPMSG_STRING(NAME , dc->name),
+ MRP_PEPMSG_UINT16(NTABLE , dc->ntable),
+ MRP_PEPMSG_UINT16(NWATCH , dc->nwatch),
+ MRP_MSG_END);
+
+ for (i = 0, t = dc->tables; i < dc->ntable; i++, t++) {
+ mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, t->table));
+ mrp_msg_append(msg, MRP_PEPMSG_STRING(COLUMNS, t->mql_columns));
+ mrp_msg_append(msg, MRP_PEPMSG_STRING(INDEX , t->mql_index));
+ }
+
+ for (i = 0, w = dc->watches; i < dc->nwatch; i++, w++) {
+ mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, w->table));
+ mrp_msg_append(msg, MRP_PEPMSG_STRING(COLUMNS, w->mql_columns));
+ mrp_msg_append(msg, MRP_PEPMSG_STRING(WHERE , w->mql_where));
+ mrp_msg_append(msg, MRP_PEPMSG_UINT16(MAXROWS, w->max_rows));
+ }
+
+
+ return msg;
+}
+
+
+int decode_register_message(mrp_msg_t *msg,
+ mrp_domctl_table_t *tables, int ntable,
+ mrp_domctl_watch_t *watches, int nwatch)
+{
+ mrp_domctl_table_t *t;
+ mrp_domctl_watch_t *w;
+ void *it;
+ char *table, *columns, *index, *where;
+ uint16_t ntbl, nwch, max_rows;
+ int i;
+
+ it = NULL;
+
+ if (!mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_UINT16(NTABLE , &ntbl),
+ MRP_PEPMSG_UINT16(NWATCH , &nwch),
+ MRP_MSG_END))
+ return FALSE;
+
+ if (ntbl > ntable || nwch > nwatch)
+ return FALSE;
+
+ for (i = 0, t = tables; i < ntable; i++, t++) {
+ if (mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_STRING(TBLNAME, &table),
+ MRP_PEPMSG_STRING(COLUMNS, &columns),
+ MRP_PEPMSG_STRING(INDEX , &index),
+ MRP_MSG_END)) {
+ t->table = table;
+ t->mql_columns = columns;
+ t->mql_index = index;
+ }
+ else
+ return FALSE;
+ }
+
+ for (i = 0, w = watches; i < nwatch; i++, w++) {
+ if (mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_STRING(TBLNAME, &table),
+ MRP_PEPMSG_STRING(COLUMNS, &columns),
+ MRP_PEPMSG_STRING(WHERE , &where),
+ MRP_PEPMSG_UINT16(MAXROWS, &max_rows),
+ MRP_MSG_END)) {
+ w->table = table;
+ w->mql_columns = columns;
+ w->mql_where = where;
+ w->max_rows = max_rows;
+ }
+ else
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+
+mrp_msg_t *create_ack_message(uint32_t seq)
+{
+ return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_ACK),
+ MRP_PEPMSG_UINT32(MSGSEQ , seq),
+ MRP_MSG_END);
+}
+
+
+mrp_msg_t *create_nak_message(uint32_t seq, int error, const char *errmsg)
+{
+ return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NAK),
+ MRP_PEPMSG_UINT32(MSGSEQ , seq),
+ MRP_PEPMSG_SINT32(ERRCODE, error),
+ MRP_PEPMSG_STRING(ERRMSG , errmsg),
+ MRP_MSG_END);
+}
+
+
+mrp_msg_t *create_notify_message(void)
+{
+ return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NOTIFY),
+ MRP_PEPMSG_UINT32(MSGSEQ , 0),
+ MRP_PEPMSG_UINT16(NCHANGE, 0),
+ MRP_PEPMSG_UINT16(NTOTAL , 0),
+ MRP_MSG_END);
+}
+
+
+int update_notify_message(mrp_msg_t *msg, int id, mqi_column_def_t *columns,
+ int ncolumn, mrp_domctl_value_t *data, int nrow)
+{
+ mrp_domctl_value_t *v;
+ uint16_t tid, nr;
+ int i;
+
+ nr = nrow;
+ tid = id;
+
+ if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) ||
+ !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nr )))
+ return FALSE;
+
+ for (i = 0, v = data; i < nrow; i++, v += ncolumn) {
+ if (!append_one_row(msg, MRP_PEPTAG_DATA, columns, ncolumn, v))
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+
+mrp_msg_t *create_set_message(uint32_t seq, mrp_domctl_data_t *tables,
+ int ntable)
+{
+ mrp_msg_t *msg;
+ mrp_domctl_value_t *rows, *col;
+ uint16_t utable, utotal, tid, ncol, nrow;
+ int i, r, c;
+
+ utable = ntable;
+ utotal = 0;
+
+ msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_SET),
+ MRP_PEPMSG_UINT32(MSGSEQ , seq),
+ MRP_PEPMSG_UINT16(NCHANGE, utable),
+ MRP_PEPMSG_UINT16(NTOTAL , 0),
+ MRP_MSG_END);
+
+ if (msg != NULL) {
+ for (i = 0; i < ntable; i++) {
+ tid = tables[i].id;
+ ncol = tables[i].ncolumn;
+ nrow = tables[i].nrow;
+
+ if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) ||
+ !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nrow)) ||
+ !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOL , ncol)))
+ goto fail;
+
+ for (r = 0; r < nrow; r++) {
+ rows = tables[i].rows[r];
+
+ for (c = 0; c < ncol; c++) {
+ col = rows + c;
+#define HANDLE_TYPE(pt, t, m) \
+ case MRP_DOMCTL_##pt: \
+ if (!mrp_msg_append(msg, MRP_PEPMSG_##t(DATA,col->m))) \
+ goto fail; \
+ break;
+
+ switch (col->type) {
+ HANDLE_TYPE(STRING , STRING, str);
+ HANDLE_TYPE(INTEGER , SINT32, s32);
+ HANDLE_TYPE(UNSIGNED, UINT32, u32);
+ HANDLE_TYPE(DOUBLE , DOUBLE, dbl);
+ default:
+ goto fail;
+ }
+#undef HANDLE_TYPE
+ }
+ }
+ utotal += nrow * ncol;
+ }
+
+ mrp_msg_set(msg, MRP_PEPMSG_UINT16(NTOTAL, utotal));
+
+ return msg;
+ }
+
+ fail:
+ mrp_msg_unref(msg);
+ return NULL;
+}
+
+
+static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col,
+ int ncolumn, mrp_domctl_value_t *data)
+{
+#define HANDLE_TYPE(dbtype, type, member) \
+ case mqi_##dbtype: \
+ if (!mrp_msg_append(msg, MRP_MSG_TAG_##type(tag, data->member))) \
+ return FALSE; \
+ break
+
+ int i;
+
+ for (i = 0; i < ncolumn; i++, data++, col++) {
+ switch (col->type) {
+ HANDLE_TYPE(integer , SINT32, s32);
+ HANDLE_TYPE(unsignd , UINT32, u32);
+ HANDLE_TYPE(floating, DOUBLE, dbl);
+ HANDLE_TYPE(string , STRING, str);
+ case mqi_blob:
+ default:
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+
+#undef HANDLE_TYPE
+}
--- /dev/null
+#ifndef __MURPHY_DOMAIN_CONTROL_MESSAGE_H__
+#define __MURPHY_DOMAIN_CONTROL_MESSAGE_H__
+
+#include <murphy/common/msg.h>
+
+#include "domain-control-types.h"
+#include "client.h"
+
+
+#define MRP_PEPMSG_UINT16(tag, value) \
+ MRP_MSG_TAG_UINT16(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_SINT16(tag, value) \
+ MRP_MSG_TAG_SINT16(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_UINT32(tag, value) \
+ MRP_MSG_TAG_UINT32(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_SINT32(tag, value) \
+ MRP_MSG_TAG_SINT32(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_DOUBLE(tag, value) \
+ MRP_MSG_TAG_DOUBLE(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_STRING(tag, value) \
+ MRP_MSG_TAG_STRING(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_ANY(tag, typep, valuep) \
+ MRP_MSG_TAG_ANY(MRP_PEPTAG_##tag, typep, valuep)
+
+/*
+ * message types
+ */
+
+typedef enum {
+ MRP_PEPMSG_REGISTER = 0x1, /* client: register me */
+ MRP_PEPMSG_UNREGISTER = 0x2, /* client: unregister me */
+ MRP_PEPMSG_SET = 0x3, /* client: set table data */
+ MRP_PEPMSG_NOTIFY = 0x4, /* server: table changes */
+ MRP_PEPMSG_ACK = 0x5, /* server: ok */
+ MRP_PEPMSG_NAK = 0x6, /* server: request failed */
+} mrp_pepmsg_type_t;
+
+
+/*
+ * message-specific tags
+ */
+
+typedef enum {
+ /*
+ * fixed common tags
+ */
+ MRP_PEPTAG_MSGTYPE = 0x1, /* message type */
+ MRP_PEPTAG_MSGSEQ = 0x2, /* sequence number */
+
+ /*
+ * fixed tags in registration messages
+ */
+ MRP_PEPTAG_NAME = 0x3, /* enforcement point name */
+ MRP_PEPTAG_NTABLE = 0x4, /* number of owned tables */
+ MRP_PEPTAG_NWATCH = 0x5, /* number of watched tables */
+ MRP_PEPTAG_TBLNAME = 0x6, /* table name */
+ MRP_PEPTAG_COLUMNS = 0x8, /* column definitions/list */
+ MRP_PEPTAG_INDEX = 0x9, /* index definition */
+ MRP_PEPTAG_WHERE = 0xa, /* where clause for select */
+ MRP_PEPTAG_MAXROWS = 0xb, /* max number of rows to select */
+
+ /*
+ * fixed tags in NAKs
+ */
+ MRP_PEPTAG_ERRCODE = 0x3, /* error code */
+ MRP_PEPTAG_ERRMSG = 0x4, /* error message */
+
+ /*
+ * fixed tags in data notification messages
+ */
+ MRP_PEPTAG_NCHANGE = 0x3, /* number of tables in notification */
+ MRP_PEPTAG_NTOTAL = 0x4, /* total columns in notification */
+ MRP_PEPTAG_TBLID = 0x5, /* table id */
+ MRP_PEPTAG_NROW = 0x6, /* number of table rows */
+ MRP_PEPTAG_NCOL = 0x7, /* number of columns in a row */
+ MRP_PEPTAG_DATA = 0x8, /* a data column */
+} mrp_pepmsg_tag_t;
+
+
+mrp_msg_t *create_register_message(mrp_domctl_t *dc);
+int decode_register_message(mrp_msg_t *msg,
+ mrp_domctl_table_t *tables, int ntable,
+ mrp_domctl_watch_t *watches, int nwatch);
+
+mrp_msg_t *create_ack_message(uint32_t seq);
+mrp_msg_t *create_nak_message(uint32_t seq, int error, const char *errmsg);
+mrp_msg_t *create_notify_message(void);
+int update_notify_message(mrp_msg_t *msg, int id, mqi_column_def_t *columns,
+ int ncolumn, mrp_domctl_value_t *data, int nrow);
+
+mrp_msg_t *create_set_message(uint32_t seq, mrp_domctl_data_t *tables,
+ int ntable);
+
+#endif /* __MURPHY_DOMAIN_CONTROL_MESSAGE_H__ */
--- /dev/null
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+
+#include <murphy-db/mql-result.h>
+
+#include "domain-control-types.h"
+#include "message.h"
+#include "table.h"
+#include "notify.h"
+
+
+static void prepare_proxy_notification(pep_proxy_t *proxy)
+{
+ proxy->notify_update = FALSE;
+ proxy->notify_ntable = 0;
+ proxy->notify_ncolumn = 0;
+ proxy->notify_fail = FALSE;
+}
+
+
+static void check_watch_notification(pep_watch_t *w)
+{
+ pep_proxy_t *proxy = w->proxy;
+ pep_table_t *t = w->table;
+ int update;
+
+ if (t->notify_all) {
+ t->h = mqi_get_table_handle(t->name);
+ update = TRUE;
+ }
+ else {
+ if (t->h != MQI_HANDLE_INVALID)
+ update = (w->stamp < mqi_get_table_stamp(t->h));
+ else
+ update = FALSE;
+ }
+
+ proxy->notify_update |= update;
+}
+
+
+static int collect_watch_notification(pep_watch_t *w)
+{
+ pep_proxy_t *proxy = w->proxy;
+ pep_table_t *t = w->table;
+ mql_result_t *r = NULL;
+ uint16_t tid = w->id;
+ uint16_t nrow, ncol;
+ mrp_msg_t *msg;
+ int i, j;
+ int types[MQI_COLUMN_MAX];
+ const char *str;
+ uint32_t u32;
+ int32_t s32;
+ double dbl;
+
+ mrp_debug("updating %s watch for %s", t->name, proxy->name);
+
+ if (proxy->notify_msg == NULL) {
+ proxy->notify_msg = create_notify_message();
+
+ if (proxy->notify_msg == NULL)
+ goto fail;
+ }
+
+ if (t->h != MQI_HANDLE_INVALID) {
+ if (!exec_mql(mql_result_rows, &r, "select %s from %s%s%s",
+ w->mql_columns, t->name,
+ w->mql_where[0] ? " where " : "", w->mql_where)) {
+ mrp_debug("select from table %s failed", t->name);
+ goto fail;
+ }
+ }
+
+ if (r != NULL) {
+ nrow = mql_result_rows_get_row_count(r);
+ ncol = mql_result_rows_get_row_column_count(r);
+ }
+ else
+ nrow = ncol = 0;
+
+ msg = proxy->notify_msg;
+
+ if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) ||
+ !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nrow)) ||
+ !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOL , ncol)))
+ goto fail;
+
+ for (i = 0; i < ncol; i++)
+ types[i] = mql_result_rows_get_row_column_type(r, i);
+
+ for (i = 0; i < nrow; i++) {
+ for (j = 0; j < ncol; j++) {
+ switch (types[j]) {
+ case mqi_string:
+ str = mql_result_rows_get_string(r, j, i, NULL, 0);
+ if (!mrp_msg_append(msg, MRP_PEPMSG_STRING(DATA, str)))
+ goto fail;
+ break;
+ case mqi_integer:
+ s32 = mql_result_rows_get_integer(r, j, i);
+ if (!mrp_msg_append(msg, MRP_PEPMSG_SINT32(DATA, s32)))
+ goto fail;
+ break;
+ case mqi_unsignd:
+ u32 = mql_result_rows_get_unsigned(r, j, i);
+ if (!mrp_msg_append(msg, MRP_PEPMSG_UINT32(DATA, u32)))
+ goto fail;
+ break;
+
+ case mqi_floating:
+ dbl = mql_result_rows_get_floating(r, j, i);
+ if (!mrp_msg_append(msg, MRP_PEPMSG_DOUBLE(DATA, dbl)))
+ goto fail;
+ break;
+
+ default:
+ goto fail;
+ }
+ }
+ }
+
+ if (r != NULL)
+ mql_result_free(r);
+
+ proxy->notify_ncolumn += nrow * ncol;
+ proxy->notify_ntable++;
+
+ return TRUE;
+
+ fail:
+ if (r != NULL)
+ mql_result_free(r);
+ mrp_msg_unref(proxy->notify_msg);
+ proxy->notify_msg = NULL;
+ proxy->notify_fail = TRUE;
+
+ return FALSE;
+}
+
+
+static int send_proxy_notification(pep_proxy_t *proxy)
+{
+ uint16_t nchange, ntotal;
+
+ if (proxy->notify_msg == NULL)
+ return TRUE;
+
+ if (!proxy->notify_fail) {
+ mrp_debug("notifying client %s", proxy->name);
+
+ nchange = proxy->notify_ntable;
+ ntotal = proxy->notify_ncolumn;
+
+ mrp_msg_set(proxy->notify_msg, MRP_PEPMSG_UINT16(NCHANGE, nchange));
+ mrp_msg_set(proxy->notify_msg, MRP_PEPMSG_UINT16(NTOTAL , ntotal ));
+
+ /*
+ mrp_log_info("Notification message for client %s:", proxy->name);
+ mrp_msg_dump(proxy->notify_msg, stdout);
+ */
+
+ mrp_transport_send(proxy->t, proxy->notify_msg);
+ }
+ else
+ mrp_log_error("Failed to generate/send notification to %s.",
+ proxy->name);
+
+ mrp_msg_unref(proxy->notify_msg);
+
+ proxy->notify_msg = NULL;
+ proxy->notify_ntable = 0;
+ proxy->notify_ncolumn = 0;
+ proxy->notify_fail = FALSE;
+ proxy->notify_all = FALSE;
+
+ return TRUE;
+}
+
+
+void notify_table_changes(pdp_t *pdp)
+{
+ mrp_list_hook_t *p, *n, *wp, *wn;
+ pep_proxy_t *proxy;
+ pep_table_t *t;
+ pep_watch_t *w;
+
+ mrp_debug("notifying clients about table changes");
+
+ mrp_list_foreach(&pdp->proxies, p, n) {
+ proxy = mrp_list_entry(p, typeof(*proxy), hook);
+ prepare_proxy_notification(proxy);
+ }
+
+ mrp_list_foreach(&pdp->tables, p, n) {
+ t = mrp_list_entry(p, typeof(*t), hook);
+
+ mrp_list_foreach(&t->watches, wp, wn) {
+ w = mrp_list_entry(wp, typeof(*w), tbl_hook);
+ check_watch_notification(w);
+ }
+
+ t->notify_all = FALSE;
+ }
+
+ mrp_list_foreach(&pdp->proxies, p, n) {
+ proxy = mrp_list_entry(p, typeof(*proxy), hook);
+
+ if (proxy->notify_update || proxy->notify_all) {
+ mrp_list_foreach(&proxy->watches, wp, wn) {
+ w = mrp_list_entry(wp, typeof(*w), pep_hook);
+ if (!collect_watch_notification(w))
+ break;
+ }
+
+ send_proxy_notification(proxy);
+ }
+ }
+}
--- /dev/null
+#ifndef __MURPHY_DOMAIN_CONTROL_NOTIFY_H__
+#define __MURPHY_DOMAIN_CONTROL_NOTIFY_H__
+
+#include "domain-control-types.h"
+
+void notify_table_changes(pdp_t *pdp);
+
+#endif /* __MURPHY_DOMAIN_CONTROL_NOTIFY_H__ */
--- /dev/null
+#include <murphy/common/macros.h>
+
+#include <murphy/core/plugin.h>
+#include <murphy/core/console.h>
+
+#include "domain-control-types.h"
+#include "domain-control.h"
+#include "client.h"
+
+
+static int plugin_init(mrp_plugin_t *plugin)
+{
+ const char *address = MRP_DEFAULT_DOMCTL_ADDRESS;
+
+ plugin->data = create_domain_control(plugin->ctx, address);
+
+ return (plugin->data != NULL);
+}
+
+
+static void plugin_exit(mrp_plugin_t *plugin)
+{
+ pdp_t *pdp = (pdp_t *)plugin->data;
+
+ destroy_domain_control(pdp);
+}
+
+
+static void cmd_cb(mrp_console_t *c, void *user_data, int argc, char **argv)
+{
+ MRP_UNUSED(user_data);
+ MRP_UNUSED(argc);
+ MRP_UNUSED(argv);
+
+ mrp_console_printf(c, "domctrl:%s() called...\n", __FUNCTION__);
+}
+
+
+#define PLUGIN_DESCRIPTION "Murphy domain control plugin."
+#define PLUGIN_VERSION MRP_VERSION_INT(0, 0, 1)
+#define PLUGIN_HELP "TODO..."
+#define PLUGIN_AUTHORS "Krisztian Litkey <krisztian.litkey@intel.com>"
+
+MRP_CONSOLE_GROUP(plugin_commands, "domain-control", NULL, NULL, {
+ MRP_TOKENIZED_CMD("cmd", cmd_cb, TRUE,
+ "cmd [args]", "a command", "A command..."),
+});
+
+MURPHY_REGISTER_PLUGIN("domain-control",
+ PLUGIN_VERSION, PLUGIN_DESCRIPTION,
+ PLUGIN_AUTHORS, PLUGIN_HELP, MRP_SINGLETON,
+ plugin_init, plugin_exit,
+ NULL, 0, /* plugin argument table */
+ NULL, 0, /* exported methods */
+ NULL, 0, /* imported methods */
+ &plugin_commands);
--- /dev/null
+#include <errno.h>
+
+#include <murphy/common/log.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/list.h>
+
+#include "domain-control-types.h"
+#include "table.h"
+#include "proxy.h"
+
+
+int init_proxies(pdp_t *pdp)
+{
+ mrp_list_init(&pdp->proxies);
+
+ return TRUE;
+}
+
+
+void destroy_proxies(pdp_t *pdp)
+{
+ MRP_UNUSED(pdp);
+
+ return;
+}
+
+
+pep_proxy_t *create_proxy(pdp_t *pdp)
+{
+ pep_proxy_t *proxy;
+
+ proxy = mrp_allocz(sizeof(*proxy));
+
+ if (proxy != NULL) {
+ mrp_list_init(&proxy->hook);
+ mrp_list_init(&proxy->watches);
+
+ proxy->pdp = pdp;
+
+ mrp_list_append(&pdp->proxies, &proxy->hook);
+ }
+
+ return proxy;
+}
+
+
+void destroy_proxy(pep_proxy_t *proxy)
+{
+ int i;
+
+ if (proxy != NULL) {
+ mrp_list_delete(&proxy->hook);
+
+ for (i = 0; i < proxy->ntable; i++)
+ destroy_proxy_table(proxy->tables + i);
+
+ destroy_proxy_watches(proxy);
+
+ mrp_free(proxy);
+ }
+}
+
+
+int register_proxy(pep_proxy_t *proxy, char *name,
+ mrp_domctl_table_t *tables, int ntable,
+ mrp_domctl_watch_t *watches, int nwatch,
+ int *error, const char **errmsg)
+{
+ pep_table_t *t;
+ mrp_domctl_watch_t *w;
+ int i;
+
+ proxy->name = mrp_strdup(name);
+ proxy->tables = mrp_allocz_array(typeof(*proxy->tables) , ntable);
+ proxy->ntable = ntable;
+
+ if (proxy->name == NULL || (ntable && proxy->tables == NULL)) {
+ *error = ENOMEM;
+ *errmsg = "failed to allocate proxy table";
+
+ return FALSE;
+ }
+
+ for (i = 0, t = proxy->tables; i < ntable; i++, t++) {
+ t->h = MQI_HANDLE_INVALID;
+ t->name = mrp_strdup(tables[i].table);
+ t->mql_columns = mrp_strdup(tables[i].mql_columns);
+ t->mql_index = mrp_strdup(tables[i].mql_index);
+
+ if (t->name == NULL || t->mql_columns == NULL || t->mql_index == NULL) {
+ mrp_log_error("Failed to allocate proxy table %s for %s.",
+ tables[i].table, name);
+ *error = ENOMEM;
+ *errmsg = "failed to allocate proxy table";
+
+ return FALSE;
+ }
+
+ if (create_proxy_table(t, error, errmsg))
+ mrp_log_info("Client %s created table %s.", proxy->name,
+ tables[i].table);
+ else {
+ mrp_log_error("Client %s failed to create table %s (%d: %s).",
+ proxy->name, tables[i].table, *error, *errmsg);
+ return FALSE;
+ }
+ }
+
+ for (i = 0, w = watches; i < nwatch; i++, w++) {
+ if (create_proxy_watch(proxy, i, w->table, w->mql_columns,
+ w->mql_where, w->max_rows, error, errmsg))
+ mrp_log_info("Client %s subscribed for table %s.", proxy->name,
+ w->table);
+ else
+ mrp_log_error("Client %s failed to subscribe for table %s.",
+ proxy->name, w->table);
+ }
+
+ return TRUE;
+}
+
+
+int unregister_proxy(pep_proxy_t *proxy)
+{
+ destroy_proxy(proxy);
+
+ return TRUE;
+}
--- /dev/null
+#ifndef __MURPHY_DOMAIN_CONTROL_PROXY_H__
+#define __MURPHY_DOMAIN_CONTROL_PROXY_H__
+
+#include "domain-control-types.h"
+
+int init_proxies(pdp_t *pdp);
+void destroy_proxies(pdp_t *pdp);
+
+pep_proxy_t *create_proxy(pdp_t *pdp);
+void destroy_proxy(pep_proxy_t *proxy);
+
+int register_proxy(pep_proxy_t *proxy, char *name,
+ mrp_domctl_table_t *tables, int ntable,
+ mrp_domctl_watch_t *watches, int nwatch,
+ int *error, const char **errmsg);
+int unregister_proxy(pep_proxy_t *proxy);
+
+#endif /* __MURPHY_DOMAIN_CONTROL_PROXY_H__ */
--- /dev/null
+#include <errno.h>
+
+#include <murphy/common/debug.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/common/utils.h>
+
+#include <murphy-db/mqi.h>
+
+#include "table.h"
--- /dev/null
+#include <errno.h>
+#include <stdarg.h>
+
+#include <murphy/common/debug.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/common/utils.h>
+
+#include <murphy-db/mqi.h>
+#include <murphy-db/mql.h>
+
+#include "domain-control.h"
+#include "table.h"
+
+#define FAIL(ec, msg) do { \
+ *errcode = ec; \
+ *errmsg = msg; \
+ goto fail; \
+ } while (0)
+
+static pep_table_t *lookup_watch_table(pdp_t *pdp, const char *name);
+
+#include "table-common.c"
+
+/*
+ * proxied and tracked tables
+ */
+
+
+static void table_event_cb(mqi_event_t *e, void *user_data)
+{
+ pdp_t *pdp = (pdp_t *)user_data;
+ const char *name = e->table.table.name;
+ mqi_handle_t h = e->table.table.handle;
+ pep_table_t *t;
+
+ switch (e->event) {
+ case mqi_table_created:
+ mrp_debug("table %s (0x%x) created", name, h);
+ break;
+
+ case mqi_table_dropped:
+ mrp_debug("table %s (0x%x) dropped", name, h);
+ break;
+ default:
+ return;
+ }
+
+ t = lookup_watch_table(pdp, name);
+
+ if (t != NULL) {
+ t->notify_all = TRUE;
+ t->h = h;
+ }
+
+ schedule_notification(pdp);
+}
+
+
+static void transaction_event_cb(mqi_event_t *e, void *user_data)
+{
+ pdp_t *pdp = (pdp_t *)user_data;
+
+ switch (e->event) {
+ case mqi_transaction_end:
+ mrp_debug("transaction ended");
+ schedule_notification(pdp);
+ break;
+
+ case mqi_transaction_start:
+ mrp_debug("transaction started");
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+static int open_db(pdp_t *pdp)
+{
+ if (mqi_open() == 0) {
+ if (mqi_create_transaction_trigger(transaction_event_cb, pdp) == 0) {
+ if (mqi_create_table_trigger(table_event_cb, pdp) == 0)
+ return TRUE;
+ else
+ mqi_drop_transaction_trigger(transaction_event_cb, pdp);
+ }
+ }
+
+ return FALSE;
+}
+
+
+static void close_db(pdp_t *pdp)
+{
+ mqi_drop_table_trigger(table_event_cb, pdp);
+ mqi_drop_transaction_trigger(transaction_event_cb, pdp);
+}
+
+
+static void purge_watch_table_cb(void *key, void *entry);
+
+
+
+int init_tables(pdp_t *pdp)
+{
+ mrp_htbl_config_t hcfg;
+
+ if (open_db(pdp)) {
+ mrp_list_init(&pdp->tables);
+
+ mrp_clear(&hcfg);
+ hcfg.comp = mrp_string_comp;
+ hcfg.hash = mrp_string_hash;
+ hcfg.free = purge_watch_table_cb;
+
+ pdp->watched = mrp_htbl_create(&hcfg);
+ }
+
+ return (pdp->watched != NULL);
+}
+
+
+void destroy_tables(pdp_t *pdp)
+{
+ close_db(pdp);
+ mrp_htbl_destroy(pdp->watched, TRUE);
+
+ pdp->watched = NULL;
+}
+
+
+int exec_mql(mql_result_type_t type, mql_result_t **resultp,
+ const char *format, ...)
+{
+ mql_result_t *r;
+ char buf[4096];
+ va_list ap;
+ int success, n;
+
+ va_start(ap, format);
+ n = vsnprintf(buf, sizeof(buf), format, ap);
+ va_end(ap);
+
+ if (n < (int)sizeof(buf)) {
+ r = mql_exec_string(type, buf);
+ success = (r == NULL || mql_result_is_success(r));
+
+ if (resultp != NULL) {
+ *resultp = r;
+ return success;
+ }
+ else {
+ mql_result_free(r);
+ return success;
+ }
+ }
+ else {
+ errno = EOVERFLOW;
+ if (resultp != NULL)
+ *resultp = NULL;
+
+ return FALSE;
+ }
+}
+
+
+static int get_table_description(pep_table_t *t)
+{
+ mqi_column_def_t columns[MQI_COLUMN_MAX];
+ mrp_domctl_value_t *values = NULL;
+ int ncolumn, i;
+
+ if (t->h == MQI_HANDLE_INVALID)
+ t->h = mqi_get_table_handle((char *)t->name);
+
+ if (t->h != MQI_HANDLE_INVALID) {
+ ncolumn = mqi_describe(t->h, columns, MRP_ARRAY_SIZE(columns));
+
+ if (ncolumn > 0) {
+ t->columns = mrp_allocz_array(typeof(*t->columns), ncolumn);
+ t->coldesc = mrp_allocz_array(typeof(*t->coldesc), ncolumn + 1);
+
+ if (t->columns != NULL && t->coldesc != NULL) {
+ memcpy(t->columns, columns, ncolumn * sizeof(*t->columns));
+ t->ncolumn = ncolumn;
+
+ for (i = 0; i < t->ncolumn; i++) {
+ t->coldesc[i].cindex = i;
+ t->coldesc[i].offset = (int)(ptrdiff_t)&values[i].str;
+ }
+
+ t->coldesc[i].cindex = -1;
+ t->coldesc[i].offset = 0;
+
+ return TRUE;
+ }
+ }
+ }
+
+ return FALSE;
+}
+
+
+int create_proxy_table(pep_table_t *t, int *errcode, const char **errmsg)
+{
+ mrp_list_init(&t->hook);
+ mrp_list_init(&t->watches);
+
+ if (mqi_get_table_handle((char *)t->name) != MQI_HANDLE_INVALID)
+ FAIL(EEXIST, "DB error: table already exists");
+
+ if (exec_mql(mql_result_dontcare, NULL,
+ "create temporary table %s (%s)", t->name, t->mql_columns)) {
+ if (t->mql_index && t->mql_index[0]) {
+ if (!exec_mql(mql_result_dontcare, NULL,
+ "create index on %s (%s)", t->name, t->mql_index))
+ FAIL(EINVAL, "failed to table index");
+ }
+
+ if (!get_table_description(t))
+ FAIL(EINVAL, "DB error: failed to get table description");
+
+ return TRUE;
+ }
+ else
+ FAIL(ENOMEM, "DB error: failed to create table");
+
+ fail:
+ return FALSE;
+}
+
+
+void destroy_proxy_table(pep_table_t *t)
+{
+ mrp_debug("destroying table %s", t->name ? t->name : "<unknown>");
+
+ if (t->h != MQI_HANDLE_INVALID)
+ mqi_drop_table(t->h);
+
+ mrp_free(t->mql_columns);
+ mrp_free(t->mql_index);
+
+ mrp_free(t->columns);
+ mrp_free(t->coldesc);
+ mrp_free(t->name);
+
+ t->name = NULL;
+ t->h = MQI_HANDLE_INVALID;
+ t->columns = NULL;
+ t->ncolumn = 0;
+}
+
+
+void destroy_proxy_tables(pep_proxy_t *proxy)
+{
+ mqi_handle_t tx;
+ int i;
+
+ mrp_debug("destroying tables of client %s", proxy->name);
+
+ tx = mqi_begin_transaction();
+ for (i = 0; i < proxy->ntable; i++)
+ destroy_proxy_table(proxy->tables + i);
+ mqi_commit_transaction(tx);
+
+ proxy->tables = NULL;
+ proxy->ntable = 0;
+}
+
+
+pep_table_t *create_watch_table(pdp_t *pdp, const char *name)
+{
+ pep_table_t *t;
+
+ t = mrp_allocz(sizeof(*t));
+
+ if (t != NULL) {
+ mrp_list_init(&t->hook);
+ mrp_list_init(&t->watches);
+
+ t->h = MQI_HANDLE_INVALID;
+ t->name = mrp_strdup(name);
+
+ if (t->name == NULL)
+ goto fail;
+
+ get_table_description(t);
+
+ if (!mrp_htbl_insert(pdp->watched, t->name, t))
+ goto fail;
+
+ mrp_list_append(&pdp->tables, &t->hook);
+ }
+
+ return t;
+
+ fail:
+ destroy_watch_table(pdp, t);
+
+ return FALSE;
+}
+
+
+static void destroy_table_watches(pep_table_t *t)
+{
+ pep_watch_t *w;
+ mrp_list_hook_t *p, *n;
+
+ if (t != NULL) {
+ mrp_list_foreach(&t->watches, p, n) {
+ w = mrp_list_entry(p, typeof(*w), tbl_hook);
+
+ mrp_list_delete(&w->tbl_hook);
+ mrp_list_delete(&w->pep_hook);
+
+ mrp_free(w->mql_columns);
+ mrp_free(w->mql_where);
+ mrp_free(w);
+ }
+ }
+}
+
+
+void destroy_watch_table(pdp_t *pdp, pep_table_t *t)
+{
+ mrp_list_delete(&t->hook);
+ t->h = MQI_HANDLE_INVALID;
+
+ if (pdp != NULL)
+ mrp_htbl_remove(pdp->watched, t->name, FALSE);
+
+ destroy_table_watches(t);
+}
+
+
+static pep_table_t *lookup_watch_table(pdp_t *pdp, const char *name)
+{
+ return mrp_htbl_lookup(pdp->watched, (void *)name);
+}
+
+
+static void purge_watch_table_cb(void *key, void *entry)
+{
+ pep_table_t *t = (pep_table_t *)entry;
+
+ MRP_UNUSED(key);
+
+ destroy_watch_table(NULL, t);
+}
+
+
+int create_proxy_watch(pep_proxy_t *proxy, int id,
+ const char *table, const char *mql_columns,
+ const char *mql_where, int max_rows,
+ int *error, const char **errmsg)
+{
+ pdp_t *pdp = proxy->pdp;
+ pep_table_t *t;
+ pep_watch_t *w;
+
+ t = lookup_watch_table(pdp, table);
+
+ if (t == NULL) {
+ t = create_watch_table(pdp, table);
+
+ if (t == NULL) {
+ *error = EINVAL;
+ *errmsg = "failed to watch table";
+ }
+ }
+
+ w = mrp_allocz(sizeof(*w));
+
+ if (w != NULL) {
+ mrp_list_init(&w->tbl_hook);
+ mrp_list_init(&w->pep_hook);
+
+ w->table = t;
+ w->mql_columns = mrp_strdup(mql_columns);
+ w->mql_where = mrp_strdup(mql_where ? mql_where : "");
+ w->max_rows = max_rows;
+ w->proxy = proxy;
+ w->id = id;
+
+ if (w->mql_columns == NULL || w->mql_where == NULL)
+ goto fail;
+
+ mrp_list_append(&t->watches, &w->tbl_hook);
+ mrp_list_append(&proxy->watches, &w->pep_hook);
+
+ return TRUE;
+ }
+ else {
+ *error = ENOMEM;
+ *errmsg = "failed to allocate table watch";
+ }
+
+ fail:
+ if (w != NULL) {
+ mrp_free(w->mql_columns);
+ mrp_free(w->mql_where);
+ mrp_free(w);
+ }
+
+ return FALSE;
+}
+
+
+void destroy_proxy_watches(pep_proxy_t *proxy)
+{
+ pep_watch_t *w;
+ mrp_list_hook_t *p, *n;
+
+ if (proxy != NULL) {
+ mrp_list_foreach(&proxy->watches, p, n) {
+ w = mrp_list_entry(p, typeof(*w), pep_hook);
+
+ mrp_list_delete(&w->tbl_hook);
+ mrp_list_delete(&w->pep_hook);
+
+ mrp_free(w);
+ }
+ }
+}
+
+
+static void reset_proxy_tables(pep_proxy_t *proxy)
+{
+ int i;
+
+ for (i = 0; i < proxy->ntable; i++)
+ mqi_delete_from(proxy->tables[i].h, NULL);
+}
+
+
+static int insert_into_table(pep_table_t *t,
+ mrp_domctl_value_t **rows, int nrow)
+{
+ void *data[2];
+ int i;
+
+ data[1] = NULL;
+
+ for (i = 0; i < nrow; i++) {
+ data[0] = rows[i];
+ if (mqi_insert_into(t->h, 0, t->coldesc, data) != 1)
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+
+int set_proxy_tables(pep_proxy_t *proxy, mrp_domctl_data_t *tables, int ntable,
+ int *error, const char **errmsg)
+{
+ mqi_handle_t tx;
+ pep_table_t *t;
+ int i, id;
+
+ tx = mqi_begin_transaction();
+
+ if (tx != MQI_HANDLE_INVALID) {
+ reset_proxy_tables(proxy);
+
+ for (i = 0; i < ntable; i++) {
+ id = tables[i].id;
+
+ if (id < 0 || id >= proxy->ntable)
+ goto fail;
+
+ t = proxy->tables + id;
+
+ if (tables[i].ncolumn != t->ncolumn)
+ goto fail;
+
+#if 0
+ if (!delete_from_table(t, tables[i].rows, tables[i].nrow))
+ goto fail;
+#endif
+
+ if (!insert_into_table(t, tables[i].rows, tables[i].nrow))
+ goto fail;
+
+
+ }
+
+ mqi_commit_transaction(tx);
+
+ return TRUE;
+
+ fail:
+ *error = EINVAL;
+ *errmsg = "failed to set tables";
+ mqi_rollback_transaction(tx);
+ }
+
+ return FALSE;
+}
--- /dev/null
+#ifndef __MURPHY_DOMAIN_CONTROL_TABLE_H__
+#define __MURPHY_DOMAIN_CONTROL_TABLE_H__
+
+#include <murphy-db/mql-result.h>
+
+#include "client.h"
+#include "domain-control-types.h"
+
+int init_tables(pdp_t *pdp);
+void destroy_tables(pdp_t *pdp);
+
+int create_proxy_table(pep_table_t *t, int *errcode, const char **errmsg);
+
+int create_proxy_watch(pep_proxy_t *proxy, int id,
+ const char *table, const char *mql_columns,
+ const char *mql_where, int max_rows,
+ int *error, const char **errmsg);
+
+void destroy_watch_table(pdp_t *pdp, pep_table_t *t);
+
+void destroy_proxy_table(pep_table_t *t);
+void destroy_proxy_tables(pep_proxy_t *proxy);
+
+void destroy_proxy_watches(pep_proxy_t *proxy);
+
+int set_proxy_tables(pep_proxy_t *proxy, mrp_domctl_data_t *tables, int ntable,
+ int *error, const char **errmsg);
+
+int exec_mql(mql_result_type_t type, mql_result_t **resultp,
+ const char *format, ...);
+
+
+#endif /* __MURPHY_DOMAIN_CONTROL_TABLE_H__ */
--- /dev/null
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <netdb.h>
+#include <signal.h>
+#include <alloca.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#define _GNU_SOURCE
+#include <getopt.h>
+
+#include <readline/readline.h>
+#include <readline/history.h>
+
+#include <murphy/common.h>
+#include "client.h"
+
+#define DEFAULT_PROMPT "test-controller> "
+
+
+/*
+ * client context
+ */
+
+typedef struct {
+ const char *addrstr; /* server address */
+ int zone; /* run in zone control mode */
+ int verbose; /* verbose mode */
+ mrp_mainloop_t *ml; /* murphy mainloop */
+ void *dc; /* domain controller */
+ int fd; /* fd for terminal input */
+ mrp_io_watch_t *iow; /* I/O watch for terminal input */
+} client_t;
+
+
+#define NVALUE 512
+
+
+/*
+ * device and stream definitions
+ */
+
+#define NDEVICE (MRP_ARRAY_SIZE(devices) - 1)
+#define DEVICE_NCOLUMN 4
+
+typedef struct {
+ const char *name;
+ const char *type;
+ int public;
+ int available;
+} device_t;
+
+static device_t devices[] = {
+ { "builtin-speaker" , "speaker" , TRUE , TRUE },
+ { "builtin-earpiece", "speaker" , FALSE, TRUE },
+ { "usb-speaker" , "speaker" , TRUE , FALSE },
+ { "a2dp-speaker" , "speaker" , TRUE , FALSE },
+ { "wired-headset" , "headset" , FALSE, FALSE },
+ { "usb-headphone" , "headphone", FALSE, FALSE },
+ { "a2dp-headphone" , "headphone", FALSE, FALSE },
+ { "sco-headset" , "headset" , FALSE, FALSE },
+ { NULL , NULL , FALSE, FALSE }
+};
+
+#define NSTREAM (MRP_ARRAY_SIZE(streams) - 1)
+#define STREAM_NCOLUMN 4
+
+typedef struct {
+ const char *name;
+ const char *role;
+ pid_t owner;
+ int playing;
+} stream_t;
+
+static stream_t streams[] = {
+ { "player1", "player" , 1234, FALSE },
+ { "player2", "player" , 4321, FALSE },
+ { "navit" , "navigator", 5432, FALSE },
+ { "phone" , "call" , 6666, FALSE },
+ { NULL , NULL , 0 , FALSE }
+};
+
+
+/*
+ * device and stream descriptors
+ */
+
+#define DEVICE_COLUMNS \
+ "name varchar(32), " \
+ "type varchar(32), " \
+ "public integer , " \
+ "available integer"
+
+#define DEVICE_INDEX "name"
+
+#define DEVICE_SELECT "*"
+
+#define DEVICE_WHERE NULL
+
+#define STREAM_COLUMNS \
+ "name varchar(32)," \
+ "role varchar(32)," \
+ "owner unsigned ," \
+ "playing integer"
+
+#define STREAM_INDEX "name"
+
+#define STREAM_SELECT "*"
+#define STREAM_WHERE NULL
+
+mrp_domctl_table_t media_tables[] = {
+ MRP_DOMCTL_TABLE("devices", DEVICE_COLUMNS, DEVICE_INDEX),
+ MRP_DOMCTL_TABLE("streams", STREAM_COLUMNS, STREAM_INDEX),
+};
+
+mrp_domctl_watch_t media_watches[] = {
+ MRP_DOMCTL_WATCH("devices", DEVICE_SELECT, DEVICE_WHERE, 0),
+ MRP_DOMCTL_WATCH("streams", STREAM_SELECT, STREAM_WHERE, 0),
+};
+
+
+/*
+ * zone and call definitions
+ */
+
+#define NZONE (MRP_ARRAY_SIZE(zones) - 1)
+#define ZONE_NCOLUMN 3
+
+typedef struct {
+ const char *name;
+ int occupied;
+ int active;
+} zone_t;
+
+static zone_t zones[] = {
+ { "driver" , TRUE , FALSE },
+ { "fearer" , FALSE, TRUE },
+ { "back-left" , TRUE , FALSE },
+ { "back-center", FALSE, FALSE },
+ { "back-right" , TRUE , TRUE },
+ { NULL , FALSE, FALSE }
+};
+
+
+#define NCALL (MRP_ARRAY_SIZE(calls) - 1)
+#define CALL_NCOLUMN 3
+
+typedef struct {
+ int id;
+ const char *state;
+ const char *modem;
+} call_t;
+
+static call_t calls[] = {
+ { 1, "active" , "modem1" },
+ { 2, "ringing" , "modem1" },
+ { 3, "held" , "modem2" },
+ { 4, "alerting", "modem2" },
+ { 0, NULL , NULL }
+};
+
+
+/*
+ * zone and call descriptors
+ */
+
+#define ZONE_COLUMNS \
+ "name varchar(32), " \
+ "occupied integer , " \
+ "active integer"
+
+#define ZONE_INDEX "name"
+
+#define ZONE_SELECT "*"
+
+#define ZONE_WHERE NULL
+
+#define CALL_COLUMNS \
+ "id integer , " \
+ "state varchar(32), " \
+ "modem varchar(32)"
+
+#define CALL_INDEX "id"
+
+#define CALL_SELECT "*"
+
+#define CALL_WHERE NULL
+
+mrp_domctl_table_t zone_tables[] = {
+ MRP_DOMCTL_TABLE("zones", ZONE_COLUMNS, ZONE_INDEX),
+ MRP_DOMCTL_TABLE("calls", CALL_COLUMNS, CALL_INDEX),
+};
+
+mrp_domctl_watch_t zone_watches[] = {
+ MRP_DOMCTL_WATCH("zones", ZONE_SELECT, ZONE_WHERE, 0),
+ MRP_DOMCTL_WATCH("calls", CALL_SELECT, CALL_WHERE, 0)
+};
+
+mrp_domctl_table_t *exports;
+int nexport;
+mrp_domctl_watch_t *imports;
+int nimport;
+
+
+static client_t *client;
+
+
+static void fatal_msg(int error, const char *format, ...);
+static void error_msg(const char *format, ...);
+static void info_msg(const char *format, ...);
+
+static void terminal_input_cb(char *input);
+
+static void export_data(client_t *c);
+
+
+static void plug_device(client_t *c, const char *name, int plug)
+{
+ device_t *d;
+ int changed;
+
+ if (c->zone) {
+ error_msg("cannot plug/unplug, client is in zone mode");
+ return;
+ }
+
+ changed = FALSE;
+
+ for (d = devices; d->name != NULL; d++) {
+ if (!strcmp(d->name, name)) {
+ changed = plug ^ d->available;
+ d->available = plug;
+ break;
+ }
+ }
+
+ if (changed) {
+ info_msg("device '%s' is now %splugged", d->name, plug ? "" : "un");
+ export_data(c);
+ }
+}
+
+
+static void list_devices(void)
+{
+ device_t *d;
+ int n;
+
+ for (d = devices, n = 0; d->name != NULL; d++, n++) {
+ info_msg("device '%s': (%s, %s), %s",
+ d->name, d->type, d->public ? "public" : "private",
+ d->available ? "available" : "currently unplugged");
+ }
+
+ if (n == 0)
+ info_msg("devices: none");
+}
+
+
+static void play_stream(client_t *c, const char *name, int play)
+{
+ stream_t *s;
+ int changed;
+
+ if (c->zone) {
+ error_msg("cannot control streams, client is in zone mode");
+ return;
+ }
+
+ changed = FALSE;
+
+ for (s = streams; s->name != NULL; s++) {
+ if (!strcmp(s->name, name)) {
+ changed = play ^ s->playing;
+ s->playing = play;
+ break;
+ }
+ }
+
+ if (changed) {
+ info_msg("stream '%s' is now %s", s->name, play ? "playing":"stopped");
+ export_data(c);
+ }
+}
+
+
+static void list_streams(void)
+{
+ stream_t *s;
+ int n;
+
+ for (s = streams, n = 0; s->name != NULL; s++, n++) {
+ info_msg("stream '%s': role %s, owner %u, currently %splaying",
+ s->name, s->role, s->owner, s->playing ? "" : "not ");
+ }
+
+ if (n == 0)
+ info_msg("streams: none");
+}
+
+
+static void set_zone_state(client_t *c, char *config)
+{
+ zone_t *z;
+ int occupied, active, changed, len;
+ char name[256], *end;
+
+ if (!c->zone) {
+ error_msg("cannot control zones, client is not in zone mode");
+ return;
+ }
+
+ while (*config == ' ' || *config == '\t')
+ config++;
+
+ end = strchr(config, ' ');
+ if (end == NULL)
+ return;
+
+ len = end - config;
+ strncpy(name, config, len);
+ name[len] = '\0';
+
+ config = end + 1;
+ while (*config == ' ' || *config == '\t')
+ config++;
+
+ occupied = FALSE;
+ active = FALSE;
+ changed = FALSE;
+
+ if (strstr(config, "occupied"))
+ occupied = TRUE;
+ if (strstr(config, "active"))
+ active = TRUE;
+
+ for (z = zones; z->name != NULL; z++) {
+ if (!strcmp(z->name, name)) {
+ changed = (active ^ z->active) | (occupied ^ z->occupied);
+ z->active = active;
+ z->occupied = occupied;
+ break;
+ }
+ }
+
+ if (changed) {
+ info_msg("zone '%s' is now %s and %s", z->name,
+ z->occupied ? "occupied" : "free",
+ z->active ? "active" : "idle");
+ export_data(c);
+ }
+}
+
+
+static void list_zones(void)
+{
+ zone_t *z;
+ int n;
+
+ for (z = zones, n = 0; z->name != NULL; z++, n++) {
+ info_msg("zone '%s' is now %s and %s", z->name,
+ z->occupied ? "occupied" : "free",
+ z->active ? "active" : "idle");
+ }
+
+ if (n == 0)
+ info_msg("zones: none");
+}
+
+
+static void set_call_state(client_t *c, const char *config)
+{
+ call_t *call;
+ char idstr[64], *state, *end;
+ int id, changed, len;
+
+ if (!c->zone) {
+ error_msg("cannot control calls, client is not in zone mode");
+ return;
+ }
+
+ while (*config == ' ' || *config == '\t')
+ config++;
+
+ end = strchr(config, ' ');
+ if (end == NULL)
+ return;
+
+ len = end - config;
+ strncpy(idstr, config, len);
+ idstr[len] = '\0';
+
+ config = end + 1;
+ while (*config == ' ' || *config == '\t')
+ config++;
+ state = (char *)config;
+
+ id = strtoul(idstr, &end, 10);
+
+ if (end && *end) {
+ error_msg("invalid call id '%s'", idstr);
+ return;
+ }
+
+ for (call = calls; call->id > 0; call++) {
+ if (call->id == id) {
+ if (strcmp(call->state, state)) {
+ mrp_free((char *)call->state);
+ call->state = mrp_strdup(state);
+ changed = TRUE;
+ break;
+ }
+ }
+ }
+
+ if (changed) {
+ info_msg("call #%d is now %s", call->id, call->state);
+ export_data(c);
+ }
+}
+
+
+static void list_calls(void)
+{
+ call_t *c;
+ int n;
+
+ for (c = calls, n = 0; c->id > 0; c++, n++) {
+ info_msg("call #%d: %s (on modem %s)", c->id, c->state, c->modem);
+ }
+
+ if (n == 0)
+ info_msg("calls: none");
+}
+
+
+static void reset_devices(void)
+{
+ mrp_clear(&devices);
+}
+
+
+void update_devices(mrp_domctl_data_t *data)
+{
+ device_t *d;
+ mrp_domctl_value_t *v;
+ int i;
+
+ if (data->nrow != 0 && data->ncolumn != DEVICE_NCOLUMN) {
+ error_msg("incorrect number of columns (%d) in device update",
+ data->ncolumn);
+ return;
+ }
+
+ if (data->nrow > (int)NDEVICE) {
+ error_msg("too many rows (%d) in device update", data->nrow);
+ return;
+ }
+
+ if (data->nrow == 0)
+ reset_devices();
+ else {
+ d = devices;
+
+ for (i = 0; i < data->nrow; i++) {
+ mrp_free((char *)d->name);
+ mrp_free((char *)d->type);
+
+ v = data->rows[i];
+ d->name = mrp_strdup(v[0].str);
+ d->type = mrp_strdup(v[1].str);
+ d->public = v[2].s32;
+ d->available = v[3].s32;
+
+ d += 1;
+ }
+ }
+
+ list_devices();
+}
+
+
+static void reset_streams(void)
+{
+ mrp_clear(&streams);
+}
+
+
+void update_streams(mrp_domctl_data_t *data)
+{
+ stream_t *s;
+ mrp_domctl_value_t *v;
+ int i;
+
+ if (data->nrow != 0 && data->ncolumn != STREAM_NCOLUMN) {
+ error_msg("incorrect number of columns (%d) in stream update",
+ data->ncolumn);
+ return;
+ }
+
+ if (data->nrow > (int)NSTREAM) {
+ error_msg("too many rows (%d) in stream update", data->nrow);
+ return;
+ }
+
+ if (data->nrow == 0)
+ reset_streams();
+ else {
+ s = streams;
+
+ for (i = 0; i < data->nrow; i++) {
+ mrp_free((char *)s->name);
+ mrp_free((char *)s->role);
+
+ v = data->rows[i];
+ s->name = mrp_strdup(v[0].str);
+ s->role = mrp_strdup(v[1].str);
+ s->owner = v[2].u32;
+ s->playing = v[3].s32;
+
+ s += 1;
+ }
+ }
+
+ list_streams();
+}
+
+
+static void reset_zones(void)
+{
+ mrp_clear(&zones);
+}
+
+
+void update_zones(mrp_domctl_data_t *data)
+{
+ zone_t *z;
+ mrp_domctl_value_t *v;
+ int i;
+
+ if (data->nrow != 0 && data->ncolumn != ZONE_NCOLUMN) {
+ error_msg("incorrect number of columns (%d) in zone update",
+ data->ncolumn);
+ return;
+ }
+
+ if (data->nrow > (int)NZONE) {
+ error_msg("too many rows (%d) in zone update", data->nrow);
+ return;
+ }
+
+ if (data->nrow == 0)
+ reset_zones();
+ else {
+ z = zones;
+
+ for (i = 0; i < data->nrow; i++) {
+ mrp_free((char *)z->name);
+
+ v = data->rows[i];
+ z->name = mrp_strdup(v[0].str);
+ z->occupied = v[1].s32;
+ z->active = v[2].s32;
+
+ z += 1;
+ }
+ }
+
+ list_zones();
+}
+
+
+static void reset_calls(void)
+{
+ mrp_clear(&calls);
+}
+
+
+void update_calls(mrp_domctl_data_t *data)
+{
+ call_t *c;
+ mrp_domctl_value_t *v;
+ int i;
+
+ if (data->nrow != 0 && data->ncolumn != CALL_NCOLUMN) {
+ error_msg("incorrect number of columns (%d) in call update.",
+ data->ncolumn);
+ return;
+ }
+
+ if (data->nrow > (int)NCALL) {
+ error_msg("too many rows (%d) in call update", data->nrow);
+ return;
+ }
+
+ if (data->nrow == 0)
+ reset_calls();
+ else {
+ c = calls;
+
+ for (i = 0; i < data->nrow; i++) {
+ mrp_free((char *)c->state);
+ mrp_free((char *)c->modem);
+
+ v = data->rows[i];
+ c->id = v[0].s32;
+ c->state = mrp_strdup(v[1].str);
+ c->modem = mrp_strdup(v[2].str);
+
+ c += 1;
+ }
+ }
+
+ list_calls();
+}
+
+
+void update_imports(client_t *c, mrp_domctl_data_t *data, int ntable)
+{
+ int i;
+
+ for (i = 0; i < ntable; i++) {
+ if (c->zone) {
+ if (data[i].id == 0)
+ update_devices(data + i);
+ else
+ update_streams(data + i);
+ }
+ else {
+ if (data[i].id == 0)
+ update_zones(data + i);
+ else
+ update_calls(data + i);
+ }
+ }
+}
+
+
+
+static void terminal_prompt_erase(void)
+{
+ int n = strlen(DEFAULT_PROMPT);
+
+ printf("\r");
+ while (n-- > 0)
+ printf(" ");
+ printf("\r");
+}
+
+
+static void terminal_prompt_display(void)
+{
+ rl_callback_handler_remove();
+ rl_callback_handler_install(DEFAULT_PROMPT, terminal_input_cb);
+}
+
+
+static void show_help(void)
+{
+#define P info_msg
+
+ P("Available commands:");
+ P(" help show this help");
+ P(" list list all data");
+ P(" list {devices|streams|zones|calls} list the requested data");
+ P(" plug <device> update <device> as plugged");
+ P(" unplug <device> update <device> as unplugged");
+ P(" play <stream> update <stream> as playing");
+ P(" stop <stream> update <stream> as stopped");
+ P(" call <call> <state> update state of <call>");
+ P(" zone <zone> [occupied,[active]] update state of <zone>");
+
+#undef P
+}
+
+
+static void terminal_process_input(char *input)
+{
+ int len;
+
+ add_history(input);
+
+ if (input == NULL || !strcmp(input, "exit")) {
+ terminal_prompt_erase();
+ exit(0);
+ }
+ else if (!strcmp(input, "help")) {
+ show_help();
+ }
+ else if (!strcmp(input, "list")) {
+ list_devices();
+ list_streams();
+ list_zones();
+ list_calls();
+ }
+ else if (!strcmp(input, "list devices"))
+ list_devices();
+ else if (!strcmp(input, "list streams"))
+ list_streams();
+ else if (!strcmp(input, "list zones"))
+ list_zones();
+ else if (!strcmp(input, "list calls"))
+ list_calls();
+ else if (!strncmp(input, "plug " , len=sizeof("plug ") - 1) ||
+ !strncmp(input, "unplug ", len=sizeof("unplug ") - 1)) {
+ plug_device(client, input + len, *input == 'p');
+ }
+ else if (!strncmp(input, "play " , len=sizeof("play ") - 1) ||
+ !strncmp(input, "stop ", len=sizeof("stop ") - 1)) {
+ play_stream(client, input + len, *input == 'p');
+ }
+ else if (!strncmp(input, "call " , len=sizeof("call ") - 1)) {
+ set_call_state(client, input + len);
+ }
+ else if (!strncmp(input, "zone " , len=sizeof("zone ") - 1)) {
+ set_zone_state(client, input + len);
+ }
+}
+
+
+static void terminal_input_cb(char *input)
+{
+ terminal_process_input(input);
+ free(input);
+}
+
+
+static void terminal_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd,
+ mrp_io_event_t events, void *user_data)
+{
+ MRP_UNUSED(w);
+ MRP_UNUSED(fd);
+ MRP_UNUSED(user_data);
+
+ if (events & MRP_IO_EVENT_IN)
+ rl_callback_read_char();
+
+ if (events & MRP_IO_EVENT_HUP)
+ mrp_mainloop_quit(ml, 0);
+}
+
+
+static void terminal_setup(client_t *c)
+{
+ mrp_io_event_t events;
+
+ c->fd = fileno(stdin);
+ events = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP;
+ c->iow = mrp_add_io_watch(c->ml, c->fd, events, terminal_cb, c);
+
+ if (c->iow == NULL)
+ fatal_msg(1, "Failed to create terminal input I/O watch.");
+ else
+ terminal_prompt_display();
+}
+
+
+static void terminal_cleanup(client_t *c)
+{
+ mrp_del_io_watch(c->iow);
+ c->iow = NULL;
+
+ rl_callback_handler_remove();
+}
+
+
+static void fatal_msg(int error, const char *format, ...)
+{
+ va_list ap;
+
+ terminal_prompt_erase();
+
+ fprintf(stderr, "fatal error: ");
+ va_start(ap, format);
+ vfprintf(stderr, format, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ fflush(stderr);
+
+ exit(error);
+}
+
+
+static void error_msg(const char *format, ...)
+{
+ va_list ap;
+
+ terminal_prompt_erase();
+
+ fprintf(stderr, "error: ");
+ va_start(ap, format);
+ vfprintf(stderr, format, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ fflush(stderr);
+
+ terminal_prompt_display();
+}
+
+
+static void info_msg(const char *format, ...)
+{
+ va_list ap;
+
+ terminal_prompt_erase();
+
+ va_start(ap, format);
+ vfprintf(stdout, format, ap);
+ va_end(ap);
+ fprintf(stdout, "\n");
+ fflush(stdout);
+
+ terminal_prompt_display();
+}
+
+
+static void signal_handler(mrp_mainloop_t *ml, mrp_sighandler_t *h,
+ int signum, void *user_data)
+{
+ MRP_UNUSED(h);
+ MRP_UNUSED(user_data);
+
+ switch (signum) {
+ case SIGINT:
+ info_msg("Got SIGINT, stopping...");
+ mrp_mainloop_quit(ml, 0);
+ break;
+ }
+}
+
+
+static void connect_notify(mrp_domctl_t *dc, int connected, int errcode,
+ const char *errmsg, void *user_data)
+{
+ MRP_UNUSED(dc);
+ MRP_UNUSED(user_data);
+
+ if (connected) {
+ info_msg("Successfully registered to server.");
+ export_data(client);
+ }
+ else
+ error_msg("No connection to server (%d: %s).", errcode, errmsg);
+}
+
+
+static void data_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
+ int ntable, void *user_data)
+{
+ client_t *client = (client_t *)user_data;
+
+ MRP_UNUSED(dc);
+
+ update_imports(client, tables, ntable);
+}
+
+
+static void export_notify(mrp_domctl_t *dc, int errcode, const char *errmsg,
+ void *user_data)
+{
+ MRP_UNUSED(dc);
+ MRP_UNUSED(user_data);
+
+ if (errcode != 0) {
+ error_msg("Data set request failed (%d: %s).", errcode, errmsg);
+ }
+ else
+ info_msg("Sucessfully set data.");
+}
+
+
+static void export_data(client_t *c)
+{
+ mrp_domctl_data_t *tables;
+ int ntable = 2;
+ mrp_domctl_value_t *values, *v;
+ int i;
+
+ tables = alloca(sizeof(*tables) * ntable);
+ values = alloca(sizeof(*values) * NVALUE);
+ v = values;
+
+ if (!c->zone) {
+ tables[0].id = 0;
+ tables[0].ncolumn = 4;
+ tables[0].nrow = NDEVICE;
+ tables[0].rows = alloca(sizeof(*tables[0].rows) * tables[0].nrow);
+
+ for (i = 0; i < (int)NDEVICE; i++) {
+ tables[0].rows[i] = v;
+ v[0].type = MRP_DOMCTL_STRING ; v[0].str = devices[i].name;
+ v[1].type = MRP_DOMCTL_STRING ; v[1].str = devices[i].type;
+ v[2].type = MRP_DOMCTL_INTEGER; v[2].s32 = devices[i].public;
+ v[3].type = MRP_DOMCTL_INTEGER; v[3].s32 = devices[i].available;
+ v += 4;
+ }
+
+ tables[1].id = 1;
+ tables[1].ncolumn = 4;
+ tables[1].nrow = NSTREAM;
+ tables[1].rows = alloca(sizeof(*tables[1].rows) * tables[1].nrow);
+
+ for (i = 0; i < (int)NSTREAM; i++) {
+ tables[1].rows[i] = v;
+ v[0].type = MRP_DOMCTL_STRING ; v[0].str = streams[i].name;
+ v[1].type = MRP_DOMCTL_STRING ; v[1].str = streams[i].role;
+ v[2].type = MRP_DOMCTL_UNSIGNED; v[2].s32 = streams[i].owner;
+ v[3].type = MRP_DOMCTL_INTEGER ; v[3].u32 = streams[i].playing;
+ v += 4;
+ }
+ }
+ else {
+ tables[0].id = 0;
+ tables[0].ncolumn = 3;
+ tables[0].nrow = NZONE;
+ tables[0].rows = alloca(sizeof(*tables[0].rows) * tables[0].nrow);
+
+ for (i = 0; i < (int)NZONE; i++) {
+ tables[0].rows[i] = v;
+ v[0].type = MRP_DOMCTL_STRING ; v[0].str = zones[i].name;
+ v[1].type = MRP_DOMCTL_INTEGER; v[1].s32 = zones[i].occupied;
+ v[2].type = MRP_DOMCTL_INTEGER; v[2].s32 = zones[i].active;
+ v += 3;
+ }
+
+ tables[1].id = 1;
+ tables[1].ncolumn = 3;
+ tables[1].nrow = NCALL;
+ tables[1].rows = alloca(sizeof(*tables[0].rows) * tables[1].nrow);
+
+ for (i = 0; i < (int)NCALL; i++) {
+ tables[1].rows[i] = v;
+ v[0].type = MRP_DOMCTL_INTEGER; v[0].s32 = calls[i].id;
+ v[1].type = MRP_DOMCTL_STRING ; v[1].str = calls[i].state;
+ v[2].type = MRP_DOMCTL_STRING ; v[2].str = calls[i].modem;
+ v += 3;
+ }
+ }
+
+ if (!mrp_domctl_set_data(c->dc, tables, ntable, export_notify, c))
+ error_msg("Failed to send data set request to server.");
+}
+
+
+static void client_setup(client_t *c)
+{
+ mrp_mainloop_t *ml;
+ mrp_domctl_t *dc;
+
+ ml = mrp_mainloop_create();
+
+ if (ml != NULL) {
+ if (!c->zone) {
+ exports = media_tables;
+ nexport = MRP_ARRAY_SIZE(media_tables);
+ imports = zone_watches;
+ nimport = MRP_ARRAY_SIZE(zone_watches);
+ }
+ else {
+ exports = zone_tables;
+ nexport = MRP_ARRAY_SIZE(zone_tables);
+ imports = media_watches;
+ nimport = MRP_ARRAY_SIZE(media_watches);
+ }
+
+ dc = mrp_domctl_create(c->zone ? "zone-ctrl" : "media-ctrl", ml,
+ exports, nexport, imports, nimport,
+ connect_notify, data_notify, c);
+
+ if (dc != NULL) {
+ c->ml = ml;
+ c->dc = dc;
+
+ mrp_add_sighandler(ml, SIGINT, signal_handler, c);
+
+ if (c->zone) {
+ zone_t *z;
+ call_t *call;
+
+ for (z = zones; z->name != NULL; z++) {
+ z->name = mrp_strdup(z->name);
+ }
+
+ for (call = calls; call->id > 0; call++) {
+ call->state = mrp_strdup(call->state);
+ call->modem = mrp_strdup(call->modem);
+ }
+
+ reset_devices();
+ reset_streams();
+ }
+ else {
+ device_t *d;
+ stream_t *s;
+
+ for (d = devices; d->name != NULL; d++) {
+ d->name = mrp_strdup(d->name);
+ d->type = mrp_strdup(d->type);
+ }
+
+ for (s = streams; s->name != NULL; s++) {
+ s->name = mrp_strdup(s->name);
+ s->role = mrp_strdup(s->role);
+ }
+
+ reset_zones();
+ reset_calls();
+ }
+ }
+ else
+ fatal_msg(1, "Failed to create enforcement point.");
+ }
+ else
+ fatal_msg(1, "Failed to create mainloop.");
+}
+
+
+static void client_cleanup(client_t *c)
+{
+ mrp_mainloop_destroy(c->ml);
+ mrp_domctl_destroy(c->dc);
+
+ c->ml = NULL;
+ c->dc = NULL;
+}
+
+
+static void client_run(client_t *c)
+{
+ if (mrp_domctl_connect(c->dc, c->addrstr))
+ info_msg("Connected to server at %s.", c->addrstr);
+ else
+ error_msg("Failed to connect to server at %s.", c->addrstr);
+
+ mrp_mainloop_run(c->ml);
+}
+
+
+static void print_usage(const char *argv0, int exit_code, const char *fmt, ...)
+{
+ va_list ap;
+
+ if (fmt && *fmt) {
+ va_start(ap, fmt);
+ vprintf(fmt, ap);
+ va_end(ap);
+ }
+
+ printf("usage: %s [options]\n\n"
+ "The possible options are:\n"
+ " -s, --server <address> connect to murphy at given address\n"
+ " -z, --zone run as zone controller\n"
+ " -v, --verbose run in verbose mode\n"
+ " -h, --help show this help on usage\n",
+ argv0);
+
+ if (exit_code < 0)
+ return;
+ else
+ exit(exit_code);
+}
+
+
+static void client_set_defaults(client_t *c)
+{
+ mrp_clear(c);
+ c->addrstr = MRP_DEFAULT_DOMCTL_ADDRESS;
+ c->zone = FALSE;
+ c->verbose = FALSE;
+}
+
+
+int parse_cmdline(client_t *c, int argc, char **argv)
+{
+# define OPTIONS "vzhs:"
+ struct option options[] = {
+ { "server" , required_argument, NULL, 's' },
+ { "zone" , no_argument , NULL, 'z' },
+ { "verbose" , optional_argument, NULL, 'v' },
+ { "help" , no_argument , NULL, 'h' },
+ { NULL, 0, NULL, 0 }
+ };
+
+ int opt;
+
+ client_set_defaults(c);
+
+ while ((opt = getopt_long(argc, argv, OPTIONS, options, NULL)) != -1) {
+ switch (opt) {
+ case 'z':
+ c->zone = TRUE;
+ break;
+
+ case 'v':
+ c->verbose = TRUE;
+ break;
+
+ case 'a':
+ c->addrstr = optarg;
+ break;
+
+ case 'h':
+ print_usage(argv[0], -1, "");
+ exit(0);
+ break;
+
+ default:
+ print_usage(argv[0], EINVAL, "invalid option '%c'", opt);
+ }
+ }
+
+ return TRUE;
+}
+
+
+int main(int argc, char *argv[])
+{
+ client_t c;
+
+ client_set_defaults(&c);
+ parse_cmdline(&c, argc, argv);
+
+ client_setup(&c);
+ terminal_setup(&c);
+
+ client = &c;
+ client_run(&c);
+
+ terminal_cleanup(&c);
+ client_cleanup(&c);
+
+ return 0;
+}