[ --with-dynamic-plugins=<plugin-list> specify which plugins compile as DSOs],
[dynamic_plugins=$withval],[dynamic_plugins=none])
-all_plugins=$(ls src/plugins/*.c 2>/dev/null | \
- sed 's#src/plugins/plugin-##g;s#\.c$##g' | tr '\n' ' ')
+all_plugins=$(find src/plugins/. -name plugin-*.c 2>/dev/null | \
+ sed 's#^.*/plugin-##g;s#\.c$##g' | tr '\n' ' ')
+
+#echo "all plugins: [$all_plugins]"
case dynamic_plugins in
all) dynamic_plugins="$all_plugins";;
return 1
}
-AM_CONDITIONAL(DISABLED_PLUGIN_TEST, [check_if_disabled test])
-AM_CONDITIONAL(DISABLED_PLUGIN_DBUS, [check_if_disabled dbus])
-AM_CONDITIONAL(DISABLED_PLUGIN_GLIB, [check_if_disabled glib])
-AM_CONDITIONAL(DISABLED_PLUGIN_CONSOLE, [check_if_disabled console])
+AM_CONDITIONAL(DISABLED_PLUGIN_TEST, [check_if_disabled test])
+AM_CONDITIONAL(DISABLED_PLUGIN_DBUS, [check_if_disabled dbus])
+AM_CONDITIONAL(DISABLED_PLUGIN_GLIB, [check_if_disabled glib])
+AM_CONDITIONAL(DISABLED_PLUGIN_CONSOLE, [check_if_disabled console])
AM_CONDITIONAL(DISABLED_PLUGIN_SIGNALLING, [check_if_disabled signalling])
+AM_CONDITIONAL(DISABLED_PLUGIN_DECISION, [check_if_disabled decision])
AM_CONDITIONAL(BUILTIN_PLUGIN_TEST, [check_if_internal test])
AM_CONDITIONAL(BUILTIN_PLUGIN_DBUS, [check_if_internal dbus])
AM_CONDITIONAL(BUILTIN_PLUGIN_GLIB, [check_if_internal glib])
AM_CONDITIONAL(BUILTIN_PLUGIN_CONSOLE, [check_if_internal console])
AM_CONDITIONAL(BUILTIN_PLUGIN_SIGNALLING, [check_if_internal signalling])
+AM_CONDITIONAL(BUILTIN_PLUGIN_DECISION, [check_if_internal decision])
# Check for Check (unit test framework).
PKG_CHECK_MODULES(CHECK,
BUILTIN_CFLAGS = -D__MURPHY_BUILTIN_PLUGIN__ $(AM_CFLAGS)
BUILTIN_LIBS =
+LINKEDIN_PLUGINS =
+
plugin_LTLIBRARIES =
plugindir = $(libdir)/murphy/plugins
resource_client_CFLAGS = $(AM_CFLAGS) $(BUILTIN_CFLAGS)
resource_client_LDADD = $(BUILTIN_LIBS) libmurphy-common.la
-
# debug file:line-function mapping generation
plugin-resource-native-func-info.c: $(PLUGIN_RESOURCE_NATIVE_REGULAR_SOURCES)
$(QUIET_GEN)$(top_builddir)/build-aux/gen-debug-table -o $@ $^
clean-func-infos::
-rm plugin-resource-native-func-info.c
+# decision plugin
+DECISION_PLUGIN_SOURCES = plugins/decision-proto/plugin-decision.c \
+ plugins/decision-proto/decision.c \
+ plugins/decision-proto/proxy.c \
+ plugins/decision-proto/table.c \
+ plugins/decision-proto/message.c \
+ plugins/decision-proto/notify.c
+
+DECISION_PLUGIN_CFLAGS =
+DECISION_PLUGIN_LIBS =
+
+if !DISABLED_PLUGIN_DECISION
+if BUILTIN_PLUGIN_DECISION
+LINKEDIN_PLUGINS += libmurphy-plugin-decision.la
+lib_LTLIBRARIES += libmurphy-plugin-decision.la
+DECISION_PLUGIN_LOADER = linkedin-decision-loader.c
+DECISION_PLUGIN_CFLAGS += $(BUILTIN_CFLAGS)
+
+
+libmurphy_plugin_decision_ladir = \
+ $(includedir)/murphy/decision-proto
+
+libmurphy_plugin_decision_la_SOURCES = \
+ $(DECISION_PLUGIN_SOURCES) \
+ $(DECISION_PLUGIN_LOADER)
+
+libmurphy_plugin_decision_la_CFLAGS = \
+ $(DECISION_PLUGIN_CFLAGS) \
+ $(AM_CFLAGS)
+
+libmurphy_plugin_decision_la_LDFLAGS = \
+ -Wl,-version-script=linker-script.decision \
+ -version-info @MURPHY_VERSION_INFO@
+
+libmurphy_plugin_decision_la_LIBADD = \
+ libmurphy-core.la \
+ libmurphy-common.la \
+ murphy-db/mql/libmql.la \
+ murphy-db/mqi/libmqi.la \
+ murphy-db/mdb/libmdb.la
+
+libmurphy_plugin_decision_la_DEPENDENCIES = \
+ linker-script.decision \
+ libmurphy-core.la \
+ libmurphy-common.la \
+ murphy-db/mql/libmql.la \
+ murphy-db/mqi/libmqi.la \
+ murphy-db/mdb/libmdb.la
+
+# linkedin decision plugin linker script generation
+linker-script.decision: $(DECISION_PLUGIN_LOADER:%.c=%.h)
+ $(QUIET_GEN)$(top_builddir)/build-aux/gen-linker-script -q -o $@ $^
+
+clean-linker-script::
+ -rm -f linker-script.decision
+else
+plugin_decision_la_SOURCES = $(DECISION_PLUGIN_SOURCES)
+plugin_decision_la_CFLAGS = $(DECISION_PLUGIN_CFLAGS) \
+ $(MURPHY_CFLAGS) $(AM_CFLAGS)
+plugin_decision_la_LDFLAGS = -module -avoid-version
+plugin_decision_la_LIBADD = $(DECISION_PLUGIN_LIBS)
+
+lib_LTLIBRARIES += libmurphy-pep.la
+libmurphy_pep_la_SOURCES = plugins/decision-proto/client.c \
+ plugins/decision-proto/table-common.c \
+ plugins/decision-proto/message.c
+libmurphy_pep_la_CFLAGS =
+libmurphy_pep_la_LIBADD = libmurphy-common.la \
+ murphy-db/mql/libmql.la \
+ murphy-db/mqi/libmqi.la \
+ murphy-db/mdb/libmdb.la
+
+
+# enforcement point test client
+bin_PROGRAMS += test-client
+
+test_client_SOURCES = plugins/decision-proto/test-client.c
+test_client_CFLAGS = $(AM_CFLAGS)
+test_client_LDADD = libmurphy-pep.la \
+ libmurphy-common.la \
+ -lreadline
+endif
###################################
# murphy daemon
murphyd_SOURCES = \
daemon/daemon.c \
daemon/config.c \
- $(BUILTIN_PLUGINS)
+ $(BUILTIN_PLUGINS) \
+ load-linkedin-plugins.c
murphyd_CFLAGS = \
$(AM_CFLAGS) \
murphyd_LDADD = \
$(BUILTIN_LIBS) \
+ $(LINKEDIN_PLUGINS) \
libmurphy-resource.la \
libmurphy-resolver.la \
libmurphy-core.la \
murphyd_LDFLAGS = -rdynamic
+
+
+###################################
+# linkedin (DSO) loader generation
+#
+
+linkedin-%-loader.c:
+ $(QUIET_GEN)$(top_builddir)/build-aux/gen-linkedin-loader \
+ -p $(shell echo $@ | \
+ sed 's/linkedin-//g;s/-loader.c//g') -o $@
+
+linkedin-%-loader.h:
+ $(QUIET_GEN)$(top_builddir)/build-aux/gen-linkedin-loader \
+ -p $(shell echo $@ | \
+ sed 's/linkedin-//g;s/-loader.h//g') -o $@
+
+load-linkedin-plugins.c:
+ $(QUIET_GEN)$(top_builddir)/build-aux/gen-linkedin-loader \
+ -o $@ $(shell echo $(LINKEDIN_PLUGINS) | \
+ sed 's/.*-//g;s/\.[^\.]*$$//g')
+
+clean-local::
+ -rm -f linkedin-*-loader.[hc] load-linkedin-plugins.c
+
###################################
# murphy console client
#
--- /dev/null
+ifneq ($(strip $(MAKECMDGOALS)),)
+%:
+ $(MAKE) -C .. $(MAKECMDGOALS)
+else
+all:
+ $(MAKE) -C .. all
+endif
--- /dev/null
+#include <errno.h>
+
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+#include <murphy/common/mainloop.h>
+#include <murphy/common/transport.h>
+
+#include "decision-types.h"
+#include "table.h"
+#include "message.h"
+#include "client.h"
+
+
+/*
+ * mark an enforcement point busy (typically while executing a callback)
+ */
+
+#define PEP_MARK_BUSY(pep, ...) do { \
+ (pep)->busy++; \
+ __VA_ARGS__ \
+ (pep)->busy--; \
+ check_destroyed(pep); \
+ } while (0)
+
+
+/*
+ * a pending request
+ */
+
+typedef struct {
+ mrp_list_hook_t hook; /* hook to pending request queue */
+ uint32_t seqno; /* sequence number/request id */
+ mrp_pep_status_cb_t cb; /* callback to call upon completion */
+ void *user_data; /* opaque callback data */
+} pending_request_t;
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data);
+static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
+ mrp_sockaddr_t *addr, socklen_t addrlen,
+ void *user_data);
+static void closed_cb(mrp_transport_t *t, int error, void *user_data);
+
+
+static int queue_pending(mrp_pep_t *pep, uint32_t seq,
+ mrp_pep_status_cb_t cb, void *user_data);
+static int notify_pending(mrp_pep_t *pep, uint32_t seq, int error,
+ const char *msg);
+static void purge_pending(mrp_pep_t *pep);
+
+
+
+
+mrp_pep_t *mrp_pep_create(const char *name, mrp_mainloop_t *ml,
+ mrp_pep_table_t *owned_tables, int nowned,
+ mrp_pep_table_t *watched_tables, int nwatched,
+ mrp_pep_connect_cb_t connect_cb,
+ mrp_pep_data_cb_t data_cb, void *user_data)
+{
+ mrp_pep_t *pep;
+
+ pep = mrp_allocz(sizeof(*pep));
+
+ if (pep != NULL) {
+ mrp_list_init(&pep->pending);
+ pep->ml = ml;
+
+ pep->name = mrp_strdup(name);
+ pep->owned = mrp_allocz_array(typeof(*pep->owned), nowned);
+ pep->watched = mrp_allocz_array(typeof(*pep->watched), nwatched);
+
+ if (pep->name != NULL && pep->owned != NULL && pep->watched != NULL) {
+ if (copy_pep_tables(owned_tables, pep->owned, nowned)) {
+ pep->nowned = nowned;
+ if (copy_pep_tables(watched_tables, pep->watched, nwatched)) {
+ pep->nwatched = nwatched;
+ pep->connect_cb = connect_cb;
+ pep->data_cb = data_cb;
+ pep->user_data = user_data;
+ pep->seqno = 1;
+
+ return pep;
+ }
+ }
+ }
+
+ mrp_pep_destroy(pep);
+ }
+
+ return NULL;
+}
+
+
+static void destroy_pep(mrp_pep_t *pep)
+{
+ mrp_free(pep->name);
+
+ free_pep_tables(pep->owned, pep->nowned);
+ free_pep_tables(pep->watched, pep->nwatched);
+
+ purge_pending(pep);
+
+ mrp_free(pep);
+}
+
+
+static inline void check_destroyed(mrp_pep_t *pep)
+{
+ if (pep->destroyed && pep->busy <= 0) {
+ destroy_pep(pep);
+ }
+}
+
+
+void mrp_pep_destroy(mrp_pep_t *pep)
+{
+ if (pep != NULL) {
+ mrp_pep_disconnect(pep);
+
+ if (pep->busy <= 0)
+ destroy_pep(pep);
+ else
+ pep->destroyed = TRUE;
+ }
+}
+
+
+static void notify_disconnect(mrp_pep_t *pep, uint32_t errcode,
+ const char *errmsg)
+{
+ PEP_MARK_BUSY(pep, {
+ pep->connected = FALSE;
+ pep->connect_cb(pep, FALSE, errcode, errmsg, pep->user_data);
+ });
+}
+
+
+static void notify_connect(mrp_pep_t *pep)
+{
+ PEP_MARK_BUSY(pep, {
+ pep->connected = TRUE;
+ pep->connect_cb(pep, TRUE, 0, NULL, pep->user_data);
+ });
+}
+
+
+static int pep_register(mrp_pep_t *pep)
+{
+ mrp_msg_t *msg;
+ int success;
+
+ msg = create_register_message(pep);
+
+ if (msg != NULL) {
+ success = mrp_transport_send(pep->t, msg);
+ mrp_msg_unref(msg);
+ }
+ else
+ success = FALSE;
+
+ return success;
+}
+
+
+int mrp_pep_connect(mrp_pep_t *pep, const char *address)
+{
+ static mrp_transport_evt_t evt = {
+ .closed = closed_cb,
+ .recvmsg = recv_cb,
+ .recvmsgfrom = recvfrom_cb,
+ };
+
+ mrp_sockaddr_t addr;
+ socklen_t addrlen;
+ const char *type;
+
+ if (pep == NULL)
+ return FALSE;
+
+ addrlen = mrp_transport_resolve(NULL, address, &addr, sizeof(addr), &type);
+
+ if (addrlen > 0) {
+ pep->t = mrp_transport_create(pep->ml, type, &evt, pep, 0);
+
+ if (pep->t != NULL) {
+ if (mrp_transport_connect(pep->t, &addr, addrlen))
+ if (pep_register(pep))
+ return TRUE;
+
+ mrp_transport_destroy(pep->t);
+ pep->t = NULL;
+ }
+ }
+
+ return FALSE;
+}
+
+
+void mrp_pep_disconnect(mrp_pep_t *pep)
+{
+ if (pep->t != NULL) {
+ mrp_transport_destroy(pep->t);
+ pep->t = NULL;
+ pep->connected = FALSE;
+ }
+}
+
+
+int mrp_pep_set_data(mrp_pep_t *pep, mrp_pep_data_t *data, int ntable,
+ mrp_pep_status_cb_t cb, void *user_data)
+{
+ mrp_msg_t *msg;
+ uint32_t seq = pep->seqno++;
+ int success, i;
+
+ if (!pep->connected)
+ return FALSE;
+
+ for (i = 0; i < ntable; i++) {
+ if (data[i].id < 0 || data[i].id >= pep->nowned)
+ return FALSE;
+
+ data[i].coldefs = pep->owned[data[i].id].columns;
+ data[i].ncolumn = pep->owned[data[i].id].ncolumn;
+ }
+
+ msg = create_set_message(seq, data, ntable);
+
+ if (msg != NULL) {
+ success = mrp_transport_send(pep->t, msg);
+ mrp_msg_unref(msg);
+
+ if (success)
+ queue_pending(pep, seq, cb, user_data);
+
+ return success;
+ }
+ else
+ return FALSE;
+}
+
+
+static void process_ack(mrp_pep_t *pep, uint32_t seq)
+{
+ if (seq != 0)
+ notify_pending(pep, seq, 0, NULL);
+ else
+ notify_connect(pep);
+}
+
+
+static void process_nak(mrp_pep_t *pep, uint32_t seq, int32_t err,
+ const char *msg)
+{
+ if (seq != 0)
+ notify_pending(pep, seq, err, msg);
+ else
+ notify_disconnect(pep, err, msg);
+}
+
+
+static void process_notify(mrp_pep_t *pep, mrp_msg_t *msg, uint32_t seq,
+ int ntable, int ncolumn)
+{
+ mrp_pep_table_t *tbl;
+ mrp_pep_data_t data[ntable], *d;
+ mrp_pep_value_t values[ncolumn], *v;
+ mqi_column_def_t *cols;
+ void *it;
+ int ncol, i, j;
+ uint16_t tblid;
+ uint16_t nrow;
+
+ it = NULL;
+ d = data;
+ v = values;
+
+ for (i = 0; i < ntable; i++) {
+ if (!mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_UINT16(TBLID, &tblid),
+ MRP_PEPMSG_UINT16(NROW , &nrow ),
+ MRP_MSG_END))
+ return;
+
+ if (tblid >= pep->nwatched)
+ return;
+
+ tbl = pep->watched + tblid;
+ cols = tbl->columns;
+ ncol = tbl->ncolumn;
+
+ d->id = tblid;
+ d->columns = v;
+ d->coldefs = tbl->columns;
+ d->ncolumn = ncol;
+ d->nrow = nrow;
+
+ if (!decode_notify_message(msg, &it, d))
+ return;
+
+ d++;
+ v += ncol * nrow;
+ }
+
+ pep->data_cb(pep, data, ntable, pep->user_data);
+}
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+ mrp_pep_t *pep = (mrp_pep_t *)user_data;
+ uint16_t type, nchange, ntotal;
+ uint32_t seq;
+ int ntable, ncolumn;
+ int32_t error;
+ const char *errmsg;
+
+ MRP_UNUSED(t);
+
+ /*
+ mrp_log_info("Received message:");
+ mrp_msg_dump(msg, stdout);
+ */
+
+ if (!mrp_msg_get(msg,
+ MRP_PEPMSG_UINT16(MSGTYPE, &type),
+ MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
+ MRP_MSG_END)) {
+ mrp_pep_disconnect(pep);
+ notify_disconnect(pep, EINVAL, "malformed message from client");
+ return;
+ }
+
+ switch (type) {
+ case MRP_PEPMSG_ACK:
+ process_ack(pep, seq);
+ break;
+
+ case MRP_PEPMSG_NAK:
+ error = EINVAL;
+ errmsg = "request failed, unknown error";
+
+ mrp_msg_get(msg,
+ MRP_PEPMSG_SINT32(ERRCODE, &error),
+ MRP_PEPMSG_STRING(ERRMSG , &errmsg),
+ MRP_MSG_END);
+
+ process_nak(pep, seq, error, errmsg);
+ break;
+
+ case MRP_PEPMSG_NOTIFY:
+ if (mrp_msg_get(msg,
+ MRP_PEPMSG_UINT16(NCHANGE, &nchange),
+ MRP_PEPMSG_UINT16(NTOTAL , &ntotal),
+ MRP_MSG_END)) {
+ ntable = nchange;
+ ncolumn = ntotal;
+
+ process_notify(pep, msg, seq, ntable, ncolumn);
+ }
+ break;
+
+ default:
+ break;
+ }
+
+}
+
+
+static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg,
+ mrp_sockaddr_t *addr, socklen_t addrlen,
+ void *user_data)
+{
+ MRP_UNUSED(addr);
+ MRP_UNUSED(addrlen);
+
+ /* XXX TODO:
+ * This should neither be called nor be necessary to specify.
+ * However, currently the transport layer mandates having to
+ * give both recv and recvfrom event callbacks if no connection
+ * event callback is given. However this is not correct because
+ * on a client side one wants to be able to create a connection-
+ * oriented transport without both connection and recvfrom event
+ * callbacks. This needs to be fixed in transport by moving the
+ * appropriate callback checks lower in the stack to the actual
+ * transport backends.
+ */
+
+ mrp_log_error("Whoa... recvfrom called for a connected transport.");
+ exit(1);
+}
+
+
+static void closed_cb(mrp_transport_t *t, int error, void *user_data)
+{
+ mrp_pep_t *pep = (mrp_pep_t *)user_data;
+
+ MRP_UNUSED(t);
+ MRP_UNUSED(pep);
+
+ if (error)
+ notify_disconnect(pep, error, strerror(error));
+ else
+ notify_disconnect(pep, ECONNRESET, "server has closed the connection");
+}
+
+
+static int queue_pending(mrp_pep_t *pep, uint32_t seq,
+ mrp_pep_status_cb_t cb, void *user_data)
+{
+ pending_request_t *pending;
+
+ pending = mrp_allocz(sizeof(*pending));
+
+ if (pending != NULL) {
+ mrp_list_init(&pending->hook);
+
+ pending->seqno = seq;
+ pending->cb = cb;
+ pending->user_data = user_data;
+
+ mrp_list_append(&pep->pending, &pending->hook);
+
+ return TRUE;
+ }
+ else
+ return FALSE;
+}
+
+
+static int notify_pending(mrp_pep_t *pep, uint32_t seq, int error,
+ const char *msg)
+{
+ mrp_list_hook_t *p, *n;
+ pending_request_t *pending;
+
+ mrp_list_foreach(&pep->pending, p, n) {
+ pending = mrp_list_entry(p, typeof(*pending), hook);
+
+ if (pending->seqno == seq) {
+ PEP_MARK_BUSY(pep, {
+ pending->cb(pep, error, msg, pending->user_data);
+ mrp_list_delete(&pending->hook);
+ mrp_free(pending);
+ });
+
+ return TRUE;
+ }
+ }
+
+ return FALSE;
+}
+
+
+static void purge_pending(mrp_pep_t *pep)
+{
+ mrp_list_hook_t *p, *n;
+ pending_request_t *pending;
+
+ mrp_list_foreach(&pep->pending, p, n) {
+ pending = mrp_list_entry(p, typeof(*pending), hook);
+
+ mrp_list_delete(&pending->hook);
+ mrp_free(pending);
+ }
+}
--- /dev/null
+#ifndef __MURPHY_DECISION_CLIENT_H__
+#define __MURPHY_DECISION_CLIENT_H__
+
+#include <murphy-db/mqi.h>
+#include <murphy/common/mainloop.h>
+
+#define MRP_DEFAULT_PEP_ADDRESS "unxs:@murphy-decision"
+
+
+/*
+ * helper macros for defining tables
+ */
+
+#define MRP_PEP_TABLE_COLUMNS(_var, _columns...) \
+ static mqi_column_def_t _var[] = { \
+ _columns, \
+ { NULL, mqi_unknown, 0, 0 }, \
+ }
+
+#define MRP_PEP_STRING(_name, _len, _is_idx) \
+ { .name = _name, .type = mqi_varchar, .length = _len, .flags = _is_idx }
+
+#define MRP_PEP_INTEGER(_name, _is_idx) \
+ { .name = _name, .type = mqi_integer, .length = 0, .flags = _is_idx }
+
+#define MRP_PEP_UNSIGNED(_name, _is_idx) \
+ { .name = _name, .type = mqi_unsignd, .length = 0, .flags = _is_idx }
+
+#define MRP_PEP_FLOATING(_name, _is_idx) \
+ { .name = _name, .type = mqi_floating, .length = 0, .flags = _is_idx }
+
+#define MRP_PEP_TABLE(_name, _columns...) { \
+ .name = _name, \
+ .columns = _columns, \
+ .ncolumn = MRP_ARRAY_SIZE(_columns), \
+ .id = 0, \
+ }
+
+#define MRP_PEP_TABLES(var, tables...) \
+ static mrp_pep_table_t var[] = { \
+ tables \
+ }
+
+/*
+ * a table definition
+ */
+
+typedef struct {
+ const char *name; /* table name */
+ mqi_column_def_t *columns; /* column definitions */
+ int ncolumn; /* number of columns */
+ int idx_col; /* column to use as index */
+ int id; /* id used to reference this table */
+} mrp_pep_table_t;
+
+
+/*
+ * table column values
+ */
+
+typedef union {
+ const char *str; /* mqi_varchar */
+ uint32_t u32; /* mqi_unsignd */
+ int32_t s32; /* mqi_integer */
+ double dbl; /* mqi_floating */
+} mrp_pep_value_t;
+
+
+/*
+ * table data
+ */
+
+typedef struct {
+ int id; /* table id */
+ mrp_pep_value_t *columns; /* table data */
+ mqi_column_def_t *coldefs; /* column definitions */
+ int ncolumn; /* columns per row */
+ int nrow; /* number of rows */
+} mrp_pep_data_t;
+
+
+
+/** Opaque policy enforcement point type. */
+typedef struct mrp_pep_s mrp_pep_t;
+
+/** Callback type for connection state notifications. */
+typedef void (*mrp_pep_connect_cb_t)(mrp_pep_t *pep, int connection,
+ int errcode, const char *errmsg,
+ void *user_data);
+
+/** Callback type for request status notifications. */
+typedef void (*mrp_pep_status_cb_t)(mrp_pep_t *pep, int errcode,
+ const char *errmsg, void *user_data);
+
+/** Callback type for data change notifications. */
+typedef void (*mrp_pep_data_cb_t)(mrp_pep_t *pep, mrp_pep_data_t *tables,
+ int ntable, void *user_data);
+
+/** Create a new policy enforcement point. */
+mrp_pep_t *mrp_pep_create(const char *name, mrp_mainloop_t *ml,
+ mrp_pep_table_t *owned_tables, int nowned_table,
+ mrp_pep_table_t *watched_tables, int nwatched_table,
+ mrp_pep_connect_cb_t connect, mrp_pep_data_cb_t data,
+ void *user_data);
+
+/** Destroy the given policy enforcement point. */
+void mrp_pep_destroy(mrp_pep_t *pep);
+
+/** Connect and register the given client to the server at the given address. */
+int mrp_pep_connect(mrp_pep_t *pep, const char *address);
+
+/** Close the connection to the server. */
+void mrp_pep_disconnect(mrp_pep_t *pep);
+
+/** Set the content of the given tables to the given data. */
+int mrp_pep_set_data(mrp_pep_t *pep, mrp_pep_data_t *tables, int ntable,
+ mrp_pep_status_cb_t cb, void *user_data);
+
+#endif /* __MURPHY_DECISION_CLIENT_H__ */
--- /dev/null
+#ifndef __MURPHY_DECISION_TYPES_H__
+#define __MURPHY_DECISION_TYPES_H__
+
+#include <murphy/common/list.h>
+#include <murphy/common/mainloop.h>
+#include <murphy/common/transport.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/core/context.h>
+
+#include "client.h"
+
+typedef struct pep_proxy_s pep_proxy_t;
+typedef struct pep_table_s pep_table_t;
+typedef struct pep_watch_s pep_watch_t;
+typedef struct pdp_s pdp_t;
+
+
+/*
+ * a policy enforcement point (on the client side)
+ */
+
+struct mrp_pep_s {
+ char *name; /* enforcment point name */
+ mrp_mainloop_t *ml; /* main loop */
+ mrp_transport_t *t; /* transport towards murphy */
+ int connected; /* transport is up */
+ mrp_pep_table_t *owned; /* owned tables */
+ int nowned; /* number of owned tables */
+ mrp_pep_table_t *watched; /* watched tables */
+ int nwatched; /* number of watched tables */
+ mrp_pep_connect_cb_t connect_cb; /* connection state change callback */
+ mrp_pep_data_cb_t data_cb; /* watched data change callback */
+ void *user_data; /* opqaue user data for callbacks */
+ int busy; /* non-zero if a callback is active */
+ int destroyed : 1; /* non-zero if destroy pending */
+ uint32_t seqno; /* request sequence number */
+ mrp_list_hook_t pending; /* queue of outstanding requests */
+};
+
+
+/*
+ * a table associated with or tracked by an enforcement point
+ */
+
+struct pep_table_s {
+ char *name; /* table name */
+ mrp_list_hook_t hook; /* to list of tables */
+ mqi_handle_t h; /* MDB table handle */
+ mqi_column_def_t *columns; /* column definitions */
+ mqi_column_desc_t *coldesc; /* column descriptors */
+ int ncolumn; /* number of columns */
+ int idx_col; /* column index of index column */
+ mrp_list_hook_t watches; /* watches for this table */
+ uint32_t notify_stamp; /* current table stamp */
+ mrp_pep_value_t *notify_data; /* notification data */
+ int notify_nrow; /* number of rows to notify */
+ int notify_fail : 1; /* notification failure */
+ int notify_all : 1; /* notify all watches */
+};
+
+
+/*
+ * a table watch
+ */
+
+struct pep_watch_s {
+ pep_table_t *table; /* table being watched */
+ pep_proxy_t *proxy; /* enforcement point */
+ int id; /* table id within proxy */
+ uint32_t stamp; /* last notified update stamp */
+ mrp_list_hook_t tbl_hook; /* hook to table watch list */
+ mrp_list_hook_t pep_hook; /* hook to proxy watch list */
+};
+
+
+/*
+ * a policy enforcement point (on the server side)
+ */
+
+struct pep_proxy_s {
+ char *name; /* enforcement point name */
+ pdp_t *pdp; /* decision point context */
+ mrp_transport_t *t; /* associated transport */
+ mrp_list_hook_t hook; /* to list of all enforcement points */
+ pep_table_t *tables; /* tables owned by this */
+ int ntable; /* number of tables */
+ mrp_list_hook_t watches; /* tables watched by this */
+ mrp_msg_t *notify_msg; /* notification being built */
+ int notify_ntable; /* number of changed tables */
+ int notify_ncolumn; /* total columns in notification */
+ int notify_fail : 1; /* notification failure */
+ int notify_all : 1; /* notify all watches */
+};
+
+
+/*
+ * policy decision point context
+ */
+
+struct pdp_s {
+ mrp_context_t *ctx; /* murphy context */
+ const char *address; /* external transport address */
+ mrp_transport_t *ext; /* external transport */
+ mrp_list_hook_t proxies; /* list of enforcement points */
+ mrp_list_hook_t tables; /* list of tables we track */
+ mrp_htbl_t *watched; /* tracked tables by name */
+ mrp_deferred_t *notify; /* deferred notification */
+ int notify_scheduled; /* is notification scheduled? */
+};
+
+
+
+
+
+
+#if 0
+
+/*
+ * common table data for tracking and proxying
+ */
+
+typedef struct table_s table_t;
+typedef struct tracked_table_s tracked_table_t;
+typedef struct proxied_table_s proxied_table_t;
+
+struct table_s {
+ char *name; /* table name */
+ mqi_handle_t h; /* table handle */
+ mqi_column_def_t *columns; /* column definitions */
+ mqi_column_desc_t *coldesc; /* column descriptors */
+ int ncolumn; /* number of columns */
+};
+
+
+/*
+ * a tracked table
+ */
+
+struct tracked_table_s {
+ table_t *t; /* actual table data */
+ mrp_list_hook_t watches; /* watches for this table */
+ mrp_pep_value_t *notify_data; /* collected data for notification */
+ int notify_nrow; /* number of rows in notification */
+ int notify_failed:1; /* notification failure */
+ int notify_all:1; /* notify all watches */
+};
+
+
+/*
+ * a proxied table
+ */
+
+struct proxied_table_s {
+ table_t *t; /* actual table data */
+ int id; /* id for enforcement point */
+ int idx_col; /* column index of index column */
+}
+
+
+#endif
+
+#endif /* __MURPHY_DECISION_TYPES_H__ */
--- /dev/null
+#include <errno.h>
+
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+
+#include "message.h"
+#include "proxy.h"
+#include "table.h"
+#include "notify.h"
+#include "decision.h"
+
+static int create_transports(pdp_t *pdp);
+static void destroy_transports(pdp_t *pdp);
+
+pdp_t *create_decision(mrp_context_t *ctx, const char *address)
+{
+ pdp_t *pdp;
+
+ pdp = mrp_allocz(sizeof(*pdp));
+
+ if (pdp != NULL) {
+ pdp->ctx = ctx;
+ pdp->address = address;
+
+ if (init_proxies(pdp) && init_tables(pdp) && create_transports(pdp))
+ return pdp;
+ else
+ destroy_decision(pdp);
+ }
+
+ return NULL;
+}
+
+
+void destroy_decision(pdp_t *pdp)
+{
+ if (pdp != NULL) {
+ destroy_proxies(pdp);
+ destroy_tables(pdp);
+ destroy_transports(pdp);
+
+ mrp_free(pdp);
+ }
+}
+
+
+static void notify_cb(mrp_mainloop_t *ml, mrp_deferred_t *d, void *user_data)
+{
+ pdp_t *pdp = (pdp_t *)user_data;
+
+ MRP_UNUSED(ml);
+
+ mrp_disable_deferred(d);
+ pdp->notify_scheduled = FALSE;
+ notify_table_changes(pdp);
+}
+
+
+void schedule_notification(pdp_t *pdp)
+{
+
+ if (pdp->notify == NULL)
+ pdp->notify = mrp_add_deferred(pdp->ctx->ml, notify_cb, pdp);
+
+ if (!pdp->notify_scheduled) {
+ mrp_debug("scheduling client notification");
+ mrp_enable_deferred(pdp->notify);
+ }
+}
+
+
+static void send_ack_reply(mrp_transport_t *t, uint32_t seq)
+{
+ mrp_msg_t *msg;
+
+ msg = create_ack_message(seq);
+
+ if (msg != NULL) {
+ mrp_transport_send(t, msg);
+ mrp_msg_unref(msg);
+ }
+}
+
+
+static void send_nak_reply(mrp_transport_t *t, uint32_t seq, int error,
+ const char *errmsg)
+{
+ mrp_msg_t *msg;
+
+ msg = create_nak_message(seq, error, errmsg);
+
+ if (msg != NULL) {
+ mrp_transport_send(t, msg);
+ mrp_msg_unref(msg);
+ }
+}
+
+
+static int process_register_request(pep_proxy_t *proxy, mrp_msg_t *req,
+ uint32_t seq)
+{
+ mrp_transport_t *t = proxy->t;
+ char *name;
+ uint16_t utable, uwatch, ucolumn;
+ int ntable, nwatch, ncolumn;
+ int error;
+ const char *errmsg;
+
+ if (mrp_msg_get(req,
+ MRP_PEPMSG_STRING(NAME , &name ),
+ MRP_PEPMSG_UINT16(NTABLE , &utable ),
+ MRP_PEPMSG_UINT16(NWATCH , &uwatch ),
+ MRP_PEPMSG_UINT16(NCOLDEF, &ucolumn),
+ MRP_MSG_END)) {
+ mrp_pep_table_t tables[utable], watches[uwatch];
+ mqi_column_def_t columns[ucolumn];
+
+ ntable = utable;
+ nwatch = uwatch;
+ ncolumn = ucolumn;
+
+ if (decode_register_message(req, tables, ntable, watches, nwatch,
+ columns, ncolumn)) {
+ if (register_proxy(proxy, name, tables, ntable, watches, nwatch,
+ &error, &errmsg)) {
+ send_ack_reply(t, seq);
+ proxy->notify_all = TRUE;
+ schedule_notification(proxy->pdp);
+
+ return TRUE;
+ }
+ }
+ else
+ goto malformed;
+ }
+ else {
+ malformed:
+ error = EINVAL;
+ errmsg = "malformed register message";
+ }
+
+ send_nak_reply(t, seq, error, errmsg);
+
+ return FALSE;
+}
+
+
+static void process_unregister_request(pep_proxy_t *proxy, uint32_t seq)
+{
+ send_ack_reply(proxy->t, seq);
+ unregister_proxy(proxy);
+}
+
+
+static void process_set_request(pep_proxy_t *proxy, mrp_msg_t *req,
+ uint32_t seq)
+{
+#if 1
+ uint16_t utable, uvalue, tblid, nrow;
+ int ntable, nvalue, i;
+ int error;
+ const char *errmsg;
+ void *it;
+
+ it = NULL;
+
+ if (mrp_msg_iterate_get(req, &it,
+ MRP_PEPMSG_UINT16(NCHANGE, &utable),
+ MRP_PEPMSG_UINT16(NTOTAL , &uvalue),
+ MRP_MSG_END)) {
+ mrp_pep_data_t data[utable], *d;
+ mrp_pep_value_t values[uvalue], *v;
+
+ ntable = utable;
+ nvalue = uvalue;
+ d = data;
+ v = values;
+
+ for (i = 0; i < ntable; i++) {
+ if (!mrp_msg_iterate_get(req, &it,
+ MRP_PEPMSG_UINT16(TBLID, &tblid),
+ MRP_PEPMSG_UINT16(NROW , &nrow),
+ MRP_MSG_END)) {
+ error = EINVAL;
+ errmsg = "malformed set message";
+ goto reply_nak;
+ }
+
+ if (tblid >= proxy->ntable) {
+ error = ENOENT;
+ errmsg = "invalid table id";
+ goto reply_nak;
+ }
+
+ d->id = tblid;
+ d->columns = v;
+ d->coldefs = proxy->tables[d->id].columns;
+ d->ncolumn = proxy->tables[d->id].ncolumn;
+ d->nrow = nrow;
+
+ if (nvalue < d->ncolumn * d->nrow) {
+ error = EINVAL;
+ errmsg = "invalid set message";
+ goto reply_nak;
+ }
+
+ if (!decode_set_message(req, &it, d)) {
+ error = EINVAL;
+ errmsg = "invalid set message";
+ goto reply_nak;
+ }
+
+ v += d->ncolumn * d->nrow;
+ d++;
+ }
+
+ if (set_proxy_tables(proxy, data, ntable, &error, &errmsg)) {
+ send_ack_reply(proxy->t, seq);
+
+ return;
+ }
+ }
+
+ reply_nak:
+ send_nak_reply(proxy->t, seq, error, errmsg);
+#else
+ uint16_t utable, uvalue;
+ int ntable, nvalue;
+ int error;
+ const char *errmsg;
+
+ if (mrp_msg_get(req,
+ MRP_PEPMSG_UINT16(NTABLE, &utable),
+ MRP_PEPMSG_UINT16(NTOTAL, &uvalue),
+ MRP_MSG_END)) {
+ mrp_pep_data_t tables[utable];
+ mrp_pep_value_t values[uvalue];
+
+ ntable = utable;
+ nvalue = uvalue;
+
+ if (decode_set_message(req, tables, ntable, values, nvalue)) {
+ if (set_proxy_tables(proxy, tables, ntable, &error, &errmsg)) {
+ send_ack_reply(proxy->t, seq);
+
+ return;
+ }
+ }
+ else
+ goto malformed;
+ }
+ else {
+ malformed:
+ error = EINVAL;
+ errmsg = "malformed set message";
+ }
+
+ send_nak_reply(proxy->t, seq, error, errmsg);
+#endif
+}
+
+
+static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
+{
+ pep_proxy_t *proxy = (pep_proxy_t *)user_data;
+ char *name = proxy && proxy->name ? proxy->name : "<unknown>";
+ uint16_t type;
+ uint32_t seq;
+
+ /*
+ mrp_log_info("Message from client %p:", proxy);
+ mrp_msg_dump(msg, stdout);
+ */
+
+ if (!mrp_msg_get(msg,
+ MRP_PEPMSG_UINT16(MSGTYPE, &type),
+ MRP_PEPMSG_UINT32(MSGSEQ , &seq ),
+ MRP_MSG_END)) {
+ mrp_log_error("Malformed message from client %s.", name);
+ send_nak_reply(t, 0, EINVAL, "malformed message");
+ }
+ else {
+ switch (type) {
+ case MRP_PEPMSG_REGISTER:
+ if (!process_register_request(proxy, msg, seq))
+ destroy_proxy(proxy);
+ break;
+
+ case MRP_PEPMSG_UNREGISTER:
+ process_unregister_request(proxy, seq);
+ break;
+
+ case MRP_PEPMSG_SET:
+ process_set_request(proxy, msg, seq);
+ break;
+
+ default:
+ break;
+ }
+ }
+}
+
+
+static void connect_cb(mrp_transport_t *ext, void *user_data)
+{
+ pdp_t *pdp = (pdp_t *)user_data;
+ pep_proxy_t *proxy;
+ int flags;
+
+ proxy = create_proxy(pdp);
+
+ if (proxy != NULL) {
+ flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
+ proxy->t = mrp_transport_accept(ext, proxy, flags);
+
+ if (proxy->t != NULL)
+ mrp_log_info("Accepted new client connection.");
+ else {
+ mrp_log_error("Failed to accept new client connection.");
+ destroy_proxy(proxy);
+ }
+ }
+}
+
+
+static void closed_cb(mrp_transport_t *t, int error, void *user_data)
+{
+ pep_proxy_t *proxy = (pep_proxy_t *)user_data;
+ char *name = proxy && proxy->name ? proxy->name : "<unknown>";
+
+ MRP_UNUSED(t);
+
+ if (error)
+ mrp_log_error("Transport to client %s closed (%d: %s).",
+ name, error, strerror(error));
+ else
+ mrp_log_info("Transport to client %s closed.", name);
+
+ mrp_log_info("Destroying client %s.", name);
+ destroy_proxy(proxy);
+}
+
+
+static int create_ext_transport(pdp_t *pdp)
+{
+ static mrp_transport_evt_t evt = {
+ .closed = closed_cb,
+ .recvmsg = recv_cb,
+ .recvmsgfrom = NULL,
+ .connection = connect_cb,
+ };
+
+ mrp_transport_t *t;
+ mrp_sockaddr_t addr;
+ socklen_t addrlen;
+ int flags;
+ const char *type;
+
+ t = NULL;
+ addrlen = mrp_transport_resolve(NULL, pdp->address,
+ &addr, sizeof(addr), &type);
+
+ if (addrlen > 0) {
+ flags = MRP_TRANSPORT_REUSEADDR;
+ t = mrp_transport_create(pdp->ctx->ml, type, &evt, pdp, flags);
+
+ if (t != NULL) {
+ if (mrp_transport_bind(t, &addr, addrlen) &&
+ mrp_transport_listen(t, 4)) {
+ mrp_log_info("Listening on transport %s...", pdp->address);
+ pdp->ext = t;
+
+ return TRUE;
+ }
+ else
+ mrp_log_error("Failed to bind transport to %s.", pdp->address);
+ }
+ else
+ mrp_log_error("Failed to create transport for %s.", pdp->address);
+ }
+ else
+ mrp_log_error("Invalid transport address %s.", pdp->address);
+
+ return FALSE;
+}
+
+
+static void destroy_ext_transport(pdp_t *pdp)
+{
+ if (pdp != NULL) {
+ mrp_transport_destroy(pdp->ext);
+ pdp->ext = NULL;
+ }
+}
+
+
+static int create_transports(pdp_t *pdp)
+{
+ return create_ext_transport(pdp);
+}
+
+
+static void destroy_transports(pdp_t *pdp)
+{
+ destroy_ext_transport(pdp);
+}
--- /dev/null
+#ifndef __MURPHY_DECISION_H__
+#define __MURPHY_DECISION_H__
+
+#include "decision-types.h"
+
+pdp_t *create_decision(mrp_context_t *ctx, const char *address);
+void destroy_decision(pdp_t *pdp);
+
+void schedule_notification(pdp_t *pdp);
+
+#endif /* __MURPHY_DECISION_H__ */
--- /dev/null
+#include "message.h"
+
+static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col,
+ int ncolumn, mrp_pep_value_t *data);
+
+mrp_msg_t *create_register_message(mrp_pep_t *pep)
+{
+ mrp_msg_t *msg;
+ mrp_pep_table_t *t;
+ mqi_column_def_t *c;
+ uint16_t ncolumn, type;
+ int i, j;
+
+ ncolumn = 0;
+ for (i = 0; i < pep->nowned; i++)
+ ncolumn += pep->owned[i].ncolumn;
+ for (i = 0; i < pep->nwatched; i++)
+ ncolumn += pep->watched[i].ncolumn;
+
+ msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_REGISTER),
+ MRP_PEPMSG_UINT32(MSGSEQ , 0),
+ MRP_PEPMSG_STRING(NAME , pep->name),
+ MRP_PEPMSG_UINT16(NTABLE , pep->nowned),
+ MRP_PEPMSG_UINT16(NWATCH , pep->nwatched),
+ MRP_PEPMSG_UINT16(NCOLDEF, ncolumn),
+ MRP_MSG_END);
+
+ for (i = 0, t = pep->owned; i < pep->nowned; i++, t++) {
+ mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, t->name));
+ mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOLUMN, t->ncolumn));
+ mrp_msg_append(msg, MRP_PEPMSG_SINT16(TBLIDX , t->idx_col));
+ for (j = 0, c = t->columns; j < t->ncolumn; j++, c++) {
+ if (c->type == mqi_varchar)
+ type = mqi_blob + c->length;
+ else
+ type = c->type;
+
+ mrp_msg_append(msg, MRP_PEPMSG_STRING(COLNAME, c->name));
+ mrp_msg_append(msg, MRP_PEPMSG_UINT16(COLTYPE, type));
+ }
+ }
+
+ for (i = 0, t = pep->watched; i < pep->nwatched; i++, t++) {
+ mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, t->name));
+ mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOLUMN, t->ncolumn));
+ for (j = 0, c = t->columns; j < t->ncolumn; j++, c++) {
+ if (c->type == mqi_varchar)
+ type = mqi_blob + c->length;
+ else
+ type = c->type;
+
+ mrp_msg_append(msg, MRP_PEPMSG_STRING(COLNAME, c->name));
+ mrp_msg_append(msg, MRP_PEPMSG_UINT16(COLTYPE, type));
+ }
+ }
+
+
+ return msg;
+}
+
+
+int decode_register_message(mrp_msg_t *msg, mrp_pep_table_t *owned, int nowned,
+ mrp_pep_table_t *watched, int nwatched,
+ mqi_column_def_t *columns, int ncolumn)
+{
+ mrp_pep_table_t *t;
+ mqi_column_def_t *c;
+ void *it;
+ char *name;
+ uint16_t ntbl, nwch, ncol, type, idx_col;
+ int i, j, n;
+
+ it = NULL;
+
+ if (!mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_UINT16(NTABLE , &ntbl),
+ MRP_PEPMSG_UINT16(NWATCH , &nwch),
+ MRP_PEPMSG_UINT16(NCOLDEF, &ncol),
+ MRP_MSG_END))
+ return FALSE;
+
+ if (ntbl > nowned || nwch > nwatched || ncol > ncolumn)
+ return FALSE;
+
+ n = 0;
+ c = columns;
+ for (i = 0, t = owned; i < nowned; i++, t++) {
+ if (mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_STRING(TBLNAME, &name),
+ MRP_PEPMSG_UINT16(NCOLUMN, &ncol),
+ MRP_PEPMSG_SINT16(TBLIDX , &idx_col),
+ MRP_MSG_END)) {
+ t->name = name;
+ t->columns = c;
+ t->ncolumn = ncol;
+ t->idx_col = idx_col;
+ }
+ else
+ return FALSE;
+
+ for (j = 0; j < t->ncolumn; j++, c++, n++) {
+ if (n >= ncolumn)
+ return FALSE;
+
+ if (mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_STRING(COLNAME, &name),
+ MRP_PEPMSG_UINT16(COLTYPE, &type),
+ MRP_MSG_END)) {
+ c->name = name;
+
+ if (type > mqi_blob) {
+ c->type = mqi_varchar;
+ c->length = type - mqi_blob;
+ }
+ else
+ c->type = type;
+ }
+ }
+ }
+
+ for (i = 0, t = watched; i < nwatched; i++, t++) {
+ if (mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_STRING(TBLNAME, &name),
+ MRP_PEPMSG_UINT16(NCOLUMN, &ncol),
+ MRP_MSG_END)) {
+ t->name = name;
+ t->columns = c;
+ t->ncolumn = ncol;
+ t->idx_col = -1;
+ }
+ else
+ return FALSE;
+
+ for (j = 0; j < t->ncolumn; j++, c++, n++) {
+ if (n >= ncolumn)
+ return FALSE;
+ if (mrp_msg_iterate_get(msg, &it,
+ MRP_PEPMSG_STRING(COLNAME, &name),
+ MRP_PEPMSG_UINT16(COLTYPE, &type),
+ MRP_MSG_END)) {
+ c->name = name;
+
+ if (type > mqi_blob) {
+ c->type = mqi_varchar;
+ c->length = type - mqi_blob;
+ }
+ else {
+ c->type = type;
+ c->length = 0;
+ }
+
+ c->flags = 0;
+ }
+ }
+ }
+
+ return TRUE;
+}
+
+
+mrp_msg_t *create_ack_message(uint32_t seq)
+{
+ return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_ACK),
+ MRP_PEPMSG_UINT32(MSGSEQ , seq),
+ MRP_MSG_END);
+}
+
+
+mrp_msg_t *create_nak_message(uint32_t seq, int error, const char *errmsg)
+{
+ return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NAK),
+ MRP_PEPMSG_UINT32(MSGSEQ , seq),
+ MRP_PEPMSG_SINT32(ERRCODE, error),
+ MRP_PEPMSG_STRING(ERRMSG , errmsg),
+ MRP_MSG_END);
+}
+
+
+mrp_msg_t *create_notify_message(void)
+{
+ return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NOTIFY),
+ MRP_PEPMSG_UINT32(MSGSEQ , 0),
+ MRP_PEPMSG_UINT16(NCHANGE, 0),
+ MRP_PEPMSG_UINT16(NTOTAL , 0),
+ MRP_MSG_END);
+}
+
+
+int update_notify_message(mrp_msg_t *msg, int id, mqi_column_def_t *columns,
+ int ncolumn, mrp_pep_value_t *data, int nrow)
+{
+ mrp_pep_value_t *v;
+ uint16_t tid, nr;
+ int i;
+
+ nr = nrow;
+ tid = id;
+
+ if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) ||
+ !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nr )))
+ return FALSE;
+
+ for (i = 0, v = data; i < nrow; i++, v += ncolumn) {
+ if (!append_one_row(msg, MRP_PEPTAG_DATA, columns, ncolumn, v))
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+
+int decode_notify_message(mrp_msg_t *msg, void **it, mrp_pep_data_t *data)
+{
+ int i, j;
+ mrp_pep_value_t *v;
+ mqi_column_def_t *d;
+
+ v = data->columns;
+ d = data->coldefs;
+
+ for (i = 0; i < data->nrow; i++) {
+ for (j = 0; j < data->ncolumn; j++) {
+ switch (d[j].type) {
+ case mqi_varchar:
+ if (!mrp_msg_iterate_get(msg, it,
+ MRP_PEPMSG_STRING(DATA, &v->str),
+ MRP_MSG_END))
+ return FALSE;
+ break;
+
+ case mqi_integer:
+ if (!mrp_msg_iterate_get(msg, it,
+ MRP_PEPMSG_SINT32(DATA, &v->s32),
+ MRP_MSG_END))
+ return FALSE;
+ break;
+
+ case mqi_unsignd:
+ if (!mrp_msg_iterate_get(msg, it,
+ MRP_PEPMSG_UINT32(DATA, &v->u32),
+ MRP_MSG_END))
+ return FALSE;
+ break;
+
+ case mqi_floating:
+ if (!mrp_msg_iterate_get(msg, it,
+ MRP_PEPMSG_DOUBLE(DATA, &v->dbl),
+ MRP_MSG_END))
+ return FALSE;
+ break;
+
+ default:
+ return FALSE;
+ }
+
+ v++;
+ }
+ }
+
+ return TRUE;
+}
+
+
+mrp_msg_t *create_set_message(uint32_t seq, mrp_pep_data_t *data, int ndata)
+{
+ mrp_msg_t *msg;
+ mrp_pep_value_t *vals;
+ mqi_column_def_t *defs;
+ uint16_t utable, utotal, tid, nval, nrow;
+ int i, j;
+
+ utable = ndata;
+ utotal = 0;
+
+ msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_SET),
+ MRP_PEPMSG_UINT32(MSGSEQ , seq),
+ MRP_PEPMSG_UINT16(NCHANGE, utable),
+ MRP_PEPMSG_UINT16(NTOTAL , 0),
+ MRP_MSG_END);
+
+ if (msg != NULL) {
+ for (i = 0; i < ndata; i++) {
+ tid = data[i].id;
+ vals = data[i].columns;
+ defs = data[i].coldefs;
+ nval = data[i].ncolumn;
+ nrow = data[i].nrow;
+
+ if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) ||
+ !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nrow)))
+ goto fail;
+
+ for (j = 0; j < nrow; j++) {
+ if (!append_one_row(msg, MRP_PEPTAG_DATA, defs, nval, vals))
+ goto fail;
+ vals += nval;
+ utotal += nval;
+ }
+ }
+
+ mrp_msg_set(msg, MRP_PEPMSG_UINT16(NTOTAL, utotal));
+
+ return msg;
+ }
+
+ fail:
+ mrp_msg_unref(msg);
+ return NULL;
+}
+
+
+int decode_set_message(mrp_msg_t *msg, void **it, mrp_pep_data_t *data)
+{
+ int i, j;
+ mrp_pep_value_t *v;
+ mqi_column_def_t *d;
+
+ v = data->columns;
+ d = data->coldefs;
+
+ for (i = 0; i < data->nrow; i++) {
+ for (j = 0; j < data->ncolumn; j++) {
+ switch (d[j].type) {
+ case mqi_varchar:
+ if (!mrp_msg_iterate_get(msg, it,
+ MRP_PEPMSG_STRING(DATA, &v->str),
+ MRP_MSG_END))
+ return FALSE;
+ break;
+
+ case mqi_integer:
+ if (!mrp_msg_iterate_get(msg, it,
+ MRP_PEPMSG_SINT32(DATA, &v->s32),
+ MRP_MSG_END))
+ return FALSE;
+ break;
+
+ case mqi_unsignd:
+ if (!mrp_msg_iterate_get(msg, it,
+ MRP_PEPMSG_UINT32(DATA, &v->u32),
+ MRP_MSG_END))
+ return FALSE;
+ break;
+
+ case mqi_floating:
+ if (!mrp_msg_iterate_get(msg, it,
+ MRP_PEPMSG_DOUBLE(DATA, &v->dbl),
+ MRP_MSG_END))
+ return FALSE;
+ break;
+
+ default:
+ return FALSE;
+ }
+
+ v++;
+ }
+ }
+
+ return TRUE;
+}
+
+
+static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col,
+ int ncolumn, mrp_pep_value_t *data)
+{
+#define HANDLE_TYPE(dbtype, type, member) \
+ case mqi_##dbtype: \
+ if (!mrp_msg_append(msg, MRP_MSG_TAG_##type(tag, data->member))) \
+ return FALSE; \
+ break
+
+ int i;
+
+ for (i = 0; i < ncolumn; i++, data++, col++) {
+ switch (col->type) {
+ HANDLE_TYPE(integer , SINT32, s32);
+ HANDLE_TYPE(unsignd , UINT32, u32);
+ HANDLE_TYPE(floating, DOUBLE, dbl);
+ HANDLE_TYPE(string , STRING, str);
+ case mqi_blob:
+ default:
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+
+#undef HANDLE_TYPE
+}
--- /dev/null
+#ifndef __MURPHY_DECISION_MESSAGE_H__
+#define __MURPHY_DECISION_MESSAGE_H__
+
+#include <murphy/common/msg.h>
+
+#include "decision-types.h"
+#include "client.h"
+
+
+#define MRP_PEPMSG_UINT16(tag, value) \
+ MRP_MSG_TAG_UINT16(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_SINT16(tag, value) \
+ MRP_MSG_TAG_SINT16(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_UINT32(tag, value) \
+ MRP_MSG_TAG_UINT32(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_SINT32(tag, value) \
+ MRP_MSG_TAG_SINT32(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_DOUBLE(tag, value) \
+ MRP_MSG_TAG_DOUBLE(MRP_PEPTAG_##tag, value)
+
+#define MRP_PEPMSG_STRING(tag, value) \
+ MRP_MSG_TAG_STRING(MRP_PEPTAG_##tag, value)
+
+/*
+ * message types
+ */
+
+typedef enum {
+ MRP_PEPMSG_REGISTER = 0x1, /* client: register me */
+ MRP_PEPMSG_UNREGISTER = 0x2, /* client: unregister me */
+ MRP_PEPMSG_SET = 0x3, /* client: set table data */
+ MRP_PEPMSG_NOTIFY = 0x4, /* server: table changes */
+ MRP_PEPMSG_ACK = 0x5, /* server: ok */
+ MRP_PEPMSG_NAK = 0x6, /* server: request failed */
+} mrp_pepmsg_type_t;
+
+
+/*
+ * message-specific tags
+ */
+
+typedef enum {
+ /*
+ * fixed common tags
+ */
+ MRP_PEPTAG_MSGTYPE = 0x1, /* message type */
+ MRP_PEPTAG_MSGSEQ = 0x2, /* sequence number */
+
+ /*
+ * fixed tags in registration messages
+ */
+ MRP_PEPTAG_NAME = 0x3, /* enforcement point name */
+ MRP_PEPTAG_NTABLE = 0x4, /* number of owned tables */
+ MRP_PEPTAG_NWATCH = 0x5, /* number of watched tables */
+ MRP_PEPTAG_NCOLDEF = 0x6, /* number of column definitions */
+ MRP_PEPTAG_TBLNAME = 0x7, /* table name */
+ MRP_PEPTAG_NCOLUMN = 0x8, /* number of columns */
+ MRP_PEPTAG_TBLIDX = 0x9, /* column index of index column */
+ MRP_PEPTAG_COLNAME = 0xa, /* column name */
+ MRP_PEPTAG_COLTYPE = 0xb, /* column type */
+
+ /*
+ * fixed tags in NAKs
+ */
+ MRP_PEPTAG_ERRCODE = 0x3, /* error code */
+ MRP_PEPTAG_ERRMSG = 0x4, /* error message */
+
+ /*
+ * fixed tags in data notification messages
+ */
+ MRP_PEPTAG_NCHANGE = 0x3, /* number of tables in notification */
+ MRP_PEPTAG_NTOTAL = 0x4, /* total columns in notification */
+ MRP_PEPTAG_TBLID = 0x5, /* table id */
+ MRP_PEPTAG_NROW = 0x6, /* number of table rows */
+ MRP_PEPTAG_DATA = 0x7, /* a data column */
+} mrp_pepmsg_tag_t;
+
+
+mrp_msg_t *create_register_message(mrp_pep_t *pep);
+int decode_register_message(mrp_msg_t *msg, mrp_pep_table_t *tables, int ntable,
+ mrp_pep_table_t *watches, int nwatch,
+ mqi_column_def_t *columns, int ncolumn);
+
+mrp_msg_t *create_ack_message(uint32_t seq);
+mrp_msg_t *create_nak_message(uint32_t seq, int error, const char *errmsg);
+mrp_msg_t *create_notify_message(void);
+int update_notify_message(mrp_msg_t *msg, int id, mqi_column_def_t *columns,
+ int ncolumn, mrp_pep_value_t *data, int nrow);
+int decode_notify_message(mrp_msg_t *msg, void **it, mrp_pep_data_t *t);
+
+mrp_msg_t *create_set_message(uint32_t seq, mrp_pep_data_t *tables, int ntable);
+int decode_set_message(mrp_msg_t *msg, void **it, mrp_pep_data_t *data);
+
+
+#endif /* __MURPHY_DECISION_MESSAGE_H__ */
--- /dev/null
+#include <murphy/common/mm.h>
+#include <murphy/common/log.h>
+
+#include "decision-types.h"
+#include "message.h"
+#include "notify.h"
+
+
+static void prepare_proxy_notification(pep_proxy_t *proxy)
+{
+ proxy->notify_ntable = 0;
+ proxy->notify_ncolumn = 0;
+ proxy->notify_fail = FALSE;
+}
+
+
+static int prepare_table_notification(pep_table_t *t)
+{
+ mrp_pep_value_t *data;
+ int nrow, size, n;
+
+ nrow = mqi_get_table_size(t->h);
+
+ mrp_debug("size of table %s: %d rows", t->name, nrow);
+
+ if (nrow <= 0) {
+ t->notify_fail = FALSE;
+
+ return TRUE;
+ }
+
+ t->notify_stamp = mqi_get_table_stamp(t->h);
+
+ size = t->ncolumn * sizeof(*data);
+ data = mrp_allocz(nrow * size);
+
+ if (data != NULL) {
+ n = mqi_select(t->h, NULL, t->coldesc, data, size, nrow);
+
+ mrp_debug("select from %s: %d rows", t->name, n);
+
+ if (n <= nrow) {
+ t->notify_data = data;
+ t->notify_nrow = n;
+ t->notify_fail = FALSE;
+
+ return TRUE;
+ }
+
+ mrp_free(data);
+ }
+
+ t->notify_fail = TRUE;
+
+ return FALSE;
+}
+
+
+static void free_table_notification(pep_table_t *t)
+{
+ mrp_free(t->notify_data);
+
+ t->notify_data = NULL;
+ t->notify_nrow = 0;
+ t->notify_all = FALSE;
+}
+
+
+static int collect_watch_notification(pep_watch_t *w)
+{
+ pep_proxy_t *proxy = w->proxy;
+ pep_table_t *t = w->table;
+
+ if (!proxy->notify_fail && !t->notify_fail) {
+ mrp_debug("updating %s watch for %s", t->name, proxy->name);
+
+ if (proxy->notify_all || t->notify_all || t->notify_stamp != w->stamp) {
+ if (proxy->notify_msg == NULL)
+ proxy->notify_msg = create_notify_message();
+
+ if (proxy->notify_msg != NULL) {
+ if (update_notify_message(proxy->notify_msg, w->id,
+ t->columns, t->ncolumn,
+ t->notify_data, t->notify_nrow)) {
+ proxy->notify_ntable++;
+ proxy->notify_ncolumn += (t->notify_nrow * t->ncolumn);
+ success:
+ w->stamp = t->notify_stamp;
+
+ return TRUE;
+ }
+ }
+ }
+ else
+ goto success;
+ }
+
+ proxy->notify_fail = TRUE;
+
+ return FALSE;
+}
+
+
+static int send_proxy_notification(pep_proxy_t *proxy)
+{
+ uint16_t nchange, ntotal;
+
+ if (proxy->notify_msg == NULL)
+ return TRUE;
+
+ if (!proxy->notify_fail) {
+ mrp_debug("notifying client %s", proxy->name);
+
+ nchange = proxy->notify_ntable;
+ ntotal = proxy->notify_ncolumn;
+
+ mrp_msg_set(proxy->notify_msg, MRP_PEPMSG_UINT16(NCHANGE, nchange));
+ mrp_msg_set(proxy->notify_msg, MRP_PEPMSG_UINT16(NTOTAL , ntotal ));
+
+
+ /*
+ mrp_log_info("Notification message for client %s:", proxy->name);
+ mrp_msg_dump(proxy->notify_msg, stdout);
+ */
+
+
+ mrp_transport_send(proxy->t, proxy->notify_msg);
+ }
+ else
+ mrp_log_error("Failed to generate/send notification to %s.",
+ proxy->name);
+
+ mrp_msg_unref(proxy->notify_msg);
+
+ proxy->notify_msg = NULL;
+ proxy->notify_ntable = 0;
+ proxy->notify_ncolumn = 0;
+ proxy->notify_fail = FALSE;
+ proxy->notify_all = FALSE;
+
+ return TRUE;
+}
+
+
+void notify_table_changes(pdp_t *pdp)
+{
+ mrp_list_hook_t *p, *n, *wp, *wn;
+ pep_proxy_t *proxy;
+ pep_table_t *t;
+ pep_watch_t *w;
+
+ mrp_debug("notifying clients about table changes");
+
+ mrp_list_foreach(&pdp->proxies, p, n) {
+ proxy = mrp_list_entry(p, typeof(*proxy), hook);
+ prepare_proxy_notification(proxy);
+ }
+
+ mrp_list_foreach(&pdp->tables, p, n) {
+ t = mrp_list_entry(p, typeof(*t), hook);
+ prepare_table_notification(t);
+
+ mrp_list_foreach(&t->watches, wp, wn) {
+ w = mrp_list_entry(wp, typeof(*w), tbl_hook);
+ collect_watch_notification(w);
+ }
+
+ free_table_notification(t);
+ }
+
+ mrp_list_foreach(&pdp->proxies, p, n) {
+ proxy = mrp_list_entry(p, typeof(*proxy), hook);
+ send_proxy_notification(proxy);
+ }
+}
--- /dev/null
+#ifndef __MURPHY_DECISION_NOTIFY_H__
+#define __MURPHY_DECISION_NOTIFY_H__
+
+#include "decision-types.h"
+
+void notify_table_changes(pdp_t *pdp);
+
+#endif /* __MURPHY_DECISION_NOTIFY_H__ */
--- /dev/null
+#include <murphy/common/macros.h>
+
+#include <murphy/core/plugin.h>
+#include <murphy/core/console.h>
+
+#include "decision-types.h"
+#include "decision.h"
+#include "client.h"
+
+
+static int plugin_init(mrp_plugin_t *plugin)
+{
+ plugin->data = create_decision(plugin->ctx, MRP_DEFAULT_PEP_ADDRESS);
+
+ return (plugin->data != NULL);
+}
+
+
+static void plugin_exit(mrp_plugin_t *plugin)
+{
+ pdp_t *pdp = (pdp_t *)plugin->data;
+
+ destroy_decision(pdp);
+}
+
+
+static void cmd_cb(mrp_console_t *c, void *user_data, int argc, char **argv)
+{
+ MRP_UNUSED(user_data);
+ MRP_UNUSED(argc);
+ MRP_UNUSED(argv);
+
+ mrp_console_printf(c, "decision:%s() called...\n", __FUNCTION__);
+}
+
+
+#define PLUGIN_DESCRIPTION "Murphy decision making plugin prototype."
+#define PLUGIN_VERSION MRP_VERSION_INT(0, 0, 1)
+#define PLUGIN_HELP "TODO..."
+#define PLUGIN_AUTHORS "Aku Ankka <aku.ankka@ankkalinna.org>"
+
+MRP_CONSOLE_GROUP(plugin_commands, "decision", NULL, NULL, {
+ MRP_TOKENIZED_CMD("cmd", cmd_cb, TRUE,
+ "cmd [args]", "a command", "A command..."),
+});
+
+MURPHY_REGISTER_PLUGIN("decision-proto",
+ PLUGIN_VERSION, PLUGIN_DESCRIPTION,
+ PLUGIN_AUTHORS, PLUGIN_HELP, MRP_SINGLETON,
+ plugin_init, plugin_exit,
+ NULL, 0, /* plugin argument table */
+ NULL, 0, /* exported methods */
+ NULL, 0, /* imported methods */
+ &plugin_commands);
--- /dev/null
+#include <murphy/common/log.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/list.h>
+
+#include "decision-types.h"
+#include "table.h"
+#include "proxy.h"
+
+
+int init_proxies(pdp_t *pdp)
+{
+ mrp_list_init(&pdp->proxies);
+
+ return TRUE;
+}
+
+
+void destroy_proxies(pdp_t *pdp)
+{
+ MRP_UNUSED(pdp);
+
+ return;
+}
+
+
+pep_proxy_t *create_proxy(pdp_t *pdp)
+{
+ pep_proxy_t *proxy;
+
+ proxy = mrp_allocz(sizeof(*proxy));
+
+ if (proxy != NULL) {
+ mrp_list_init(&proxy->hook);
+ mrp_list_init(&proxy->watches);
+
+ proxy->pdp = pdp;
+
+ mrp_list_append(&pdp->proxies, &proxy->hook);
+ }
+
+ return proxy;
+}
+
+
+void destroy_proxy(pep_proxy_t *proxy)
+{
+ int i;
+
+ if (proxy != NULL) {
+ mrp_list_delete(&proxy->hook);
+
+ for (i = 0; i < proxy->ntable; i++)
+ destroy_proxy_table(proxy->tables + i);
+
+ destroy_proxy_watches(proxy);
+
+ mrp_free(proxy);
+ }
+}
+
+
+int register_proxy(pep_proxy_t *proxy, char *name,
+ mrp_pep_table_t *tables, int ntable,
+ mrp_pep_table_t *watches, int nwatch,
+ int *error, const char **errmsg)
+{
+ int i;
+
+ proxy->name = mrp_strdup(name);
+ proxy->tables = mrp_allocz_array(typeof(*proxy->tables) , ntable);
+ proxy->ntable = ntable;
+
+ if (proxy->name == NULL || (ntable && proxy->tables == NULL))
+ return FALSE;
+
+ for (i = 0; i < ntable; i++) {
+ if (create_proxy_table(proxy->tables + i, tables + i, error, errmsg))
+ mrp_log_info("Client %s created table %s.", proxy->name,
+ tables[i].name);
+ else {
+ mrp_log_error("Client %s failed to create table %s (%d: %s).",
+ proxy->name, tables[i].name, *error, *errmsg);
+ return FALSE;
+ }
+ }
+
+ for (i = 0; i < nwatch; i++) {
+ if (create_proxy_watch(proxy, i, watches + i, error, errmsg))
+ mrp_log_info("Client %s subscribed for table %s.", proxy->name,
+ watches[i].name);
+ else
+ mrp_log_error("Client %s failed to subscribe for table %s.",
+ proxy->name, watches[i].name);
+ }
+
+ return TRUE;
+}
+
+
+int unregister_proxy(pep_proxy_t *proxy)
+{
+ destroy_proxy(proxy);
+
+ return TRUE;
+}
--- /dev/null
+#ifndef __MURPHY_DECISION_PROXY_H__
+#define __MURPHY_DECISION_PROXY_H__
+
+#include "decision-types.h"
+
+int init_proxies(pdp_t *pdp);
+void destroy_proxies(pdp_t *pdp);
+
+pep_proxy_t *create_proxy(pdp_t *pdp);
+void destroy_proxy(pep_proxy_t *proxy);
+
+int register_proxy(pep_proxy_t *proxy, char *name,
+ mrp_pep_table_t *tables, int ntable,
+ mrp_pep_table_t *watches, int nwatch,
+ int *error, const char **errmsg);
+int unregister_proxy(pep_proxy_t *proxy);
+
+#endif /* __MURPHY_DECISION_PROXY_H__ */
--- /dev/null
+#include <errno.h>
+
+#include <murphy/common/debug.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/common/utils.h>
+
+#include <murphy-db/mqi.h>
+
+#include "table.h"
+
+
+/*
+ * client-side tables, common table routines
+ */
+
+static void purge_pep_table(mrp_pep_table_t *t)
+{
+ int i;
+
+ if (t != NULL) {
+ mrp_free((char *)t->name);
+
+ for (i = 0; i < t->ncolumn; i++)
+ mrp_free((char *)t->columns[i].name);
+
+ mrp_free(t->columns);
+ }
+}
+
+
+static void free_column_definitions(mqi_column_def_t *columns, int ncolumn)
+{
+ int i;
+
+ if (columns != NULL) {
+ for (i = 0; i < ncolumn; i++)
+ mrp_free((char *)columns[i].name);
+
+ mrp_free(columns);
+ }
+}
+
+
+static int copy_column_definitions(mqi_column_def_t *src, int nsrc,
+ mqi_column_def_t **dstcol, int *ndst)
+{
+ mqi_column_def_t *dst;
+ int n, i;
+
+ if (nsrc > 0) {
+ if (src[nsrc - 1].name == NULL)
+ n = nsrc - 1;
+ else
+ n = nsrc;
+
+ dst = mrp_allocz_array(mqi_column_def_t, n + 1);
+
+ if (dst == NULL)
+ return FALSE;
+
+ for (i = 0; i < n; i++) {
+ dst[i].type = src[i].type;
+ dst[i].length = src[i].length;
+ dst[i].name = mrp_strdup(src[i].name);
+
+ if (dst[i].name == NULL)
+ goto fail;
+ }
+
+ *dstcol = dst;
+ *ndst = n;
+
+ return TRUE;
+ }
+ else {
+ *dstcol = NULL;
+ *ndst = 0;
+
+ return FALSE;
+ }
+
+ fail:
+ free_column_definitions(dst, n);
+ return FALSE;
+}
+
+
+static void free_column_descriptors(mqi_column_desc_t *coldesc)
+{
+ mrp_free(coldesc);
+}
+
+
+static int setup_column_descriptors(mqi_column_def_t *columns, int ncolumn,
+ mqi_column_desc_t **coldesc)
+{
+#define SETUP_TYPE(type, member) \
+ case mqi_##type: \
+ desc->cindex = i; \
+ desc->offset = (void *)&col->member - (void *)NULL; \
+ break;
+
+ mqi_column_def_t *def;
+ mqi_column_desc_t *desc;
+ mrp_pep_value_t *col;
+ int i;
+
+ *coldesc = mrp_allocz_array(mqi_column_desc_t, ncolumn + 1);
+
+ if (coldesc != NULL) {
+ def = columns;
+ desc = *coldesc;
+ col = NULL;
+
+ for (i = 0; i < ncolumn; i++) {
+ switch (def->type) {
+ SETUP_TYPE(integer , s32);
+ SETUP_TYPE(unsignd , u32);
+ SETUP_TYPE(floating, dbl);
+ SETUP_TYPE(string , str);
+
+ default:
+ case mqi_blob:
+ goto fail;
+ }
+
+ def++;
+ desc++;
+ col++;
+ }
+
+ desc->cindex = -1;
+ desc->offset = 1;
+
+ return TRUE;
+ }
+
+ fail:
+ free_column_descriptors(*coldesc);
+ *coldesc = NULL;
+
+ return FALSE;
+
+#undef SETUP_TYPE
+}
+
+
+static int check_columns(mqi_column_def_t *p, int np,
+ mqi_column_def_t *q, int nq)
+{
+ int i;
+
+ if (np == nq) {
+ for (i = 0; i < np; i++, p++, q++) {
+ if (p->type != q->type || p->length != q->length)
+ return FALSE;
+ if (strcmp(p->name, q->name))
+ return FALSE;
+ }
+
+ return TRUE;
+ }
+ else
+ return FALSE;
+}
+
+
+int copy_pep_table(mrp_pep_table_t *src, mrp_pep_table_t *dst)
+{
+ int i, ncolumn;
+
+ dst->name = mrp_strdup(src->name);
+ if (dst->name == NULL)
+ return FALSE;
+
+ if (src->columns[src->ncolumn - 1].name != NULL)
+ ncolumn = src->ncolumn;
+ else
+ ncolumn = src->ncolumn - 1;
+
+ dst->columns = mrp_allocz_array(typeof(*dst->columns), ncolumn + 1);
+ if (dst->columns == NULL) {
+ mrp_free((char *)dst->name);
+
+ return FALSE;
+ }
+
+ dst->ncolumn = ncolumn;
+ dst->idx_col = -1;
+
+ for (i = 0; i < ncolumn; i++) {
+ dst->columns[i].type = src->columns[i].type;
+ dst->columns[i].length = src->columns[i].length;
+ dst->columns[i].name = mrp_strdup(src->columns[i].name);
+
+ if (dst->columns[i].name == NULL)
+ goto fail;
+
+ if (src->columns[i].flags != 0) {
+ if (dst->idx_col == -1)
+ dst->idx_col = i;
+ else
+ goto fail;
+ }
+
+ dst->columns[i].flags = 0;
+ }
+
+ return TRUE;
+
+ fail:
+ purge_pep_table(dst);
+
+ return FALSE;
+}
+
+
+int copy_pep_tables(mrp_pep_table_t *src, mrp_pep_table_t *dst, int n)
+{
+ int i;
+
+ for (i = 0; i < n; i++) {
+ if (!copy_pep_table(src + i, dst + i)) {
+ for (i--; i >= 0; i--)
+ purge_pep_table(dst + i);
+
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+}
+
+
+void free_pep_table(mrp_pep_table_t *t)
+{
+ purge_pep_table(t);
+ mrp_free(t);
+}
+
+
+void free_pep_tables(mrp_pep_table_t *tables, int n)
+{
+ int i;
+
+ for (i = 0; i < n; i++)
+ purge_pep_table(tables + i);
+
+ mrp_free(tables);
+}
--- /dev/null
+#include <errno.h>
+
+#include <murphy/common/debug.h>
+#include <murphy/common/mm.h>
+#include <murphy/common/hashtbl.h>
+#include <murphy/common/utils.h>
+
+#include <murphy-db/mqi.h>
+
+#include "decision.h"
+#include "table.h"
+
+#define FAIL(ec, msg) do { \
+ *errcode = ec; \
+ *errmsg = msg; \
+ goto fail; \
+ } while (0)
+
+static pep_table_t *lookup_watch_table(pdp_t *pdp, const char *name);
+
+#include "table-common.c"
+
+/*
+ * proxied and tracked tables
+ */
+
+
+static void table_event_cb(mqi_event_t *e, void *user_data)
+{
+ pdp_t *pdp = (pdp_t *)user_data;
+ const char *name = e->table.table.name;
+ mqi_handle_t h = e->table.table.handle;
+ pep_table_t *t;
+
+ switch (e->event) {
+ case mqi_table_created:
+ mrp_debug("table %s (0x%x) created", name, h);
+ break;
+
+ case mqi_table_dropped:
+ mrp_debug("table %s (0x%x) dropped", name, h);
+ break;
+ default:
+ return;
+ }
+
+ t = lookup_watch_table(pdp, name);
+
+ if (t != NULL) {
+ t->notify_all = TRUE;
+ t->h = h;
+ }
+
+ schedule_notification(pdp);
+}
+
+
+static void transaction_event_cb(mqi_event_t *e, void *user_data)
+{
+ pdp_t *pdp = (pdp_t *)user_data;
+
+ switch (e->event) {
+ case mqi_transaction_end:
+ mrp_debug("transaction ended");
+ schedule_notification(pdp);
+ break;
+
+ case mqi_transaction_start:
+ mrp_debug("transaction started");
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+static int open_db(pdp_t *pdp)
+{
+ if (mqi_open() == 0) {
+ if (mqi_create_transaction_trigger(transaction_event_cb, pdp) == 0) {
+ if (mqi_create_table_trigger(table_event_cb, pdp) == 0)
+ return TRUE;
+ else
+ mqi_drop_transaction_trigger(transaction_event_cb, pdp);
+ }
+ }
+
+ return FALSE;
+}
+
+
+static void close_db(pdp_t *pdp)
+{
+ mqi_drop_table_trigger(table_event_cb, pdp);
+ mqi_drop_transaction_trigger(transaction_event_cb, pdp);
+}
+
+
+static void purge_watch_table_cb(void *key, void *entry);
+
+
+
+int init_tables(pdp_t *pdp)
+{
+ mrp_htbl_config_t hcfg;
+
+ if (open_db(pdp)) {
+ mrp_list_init(&pdp->tables);
+
+ mrp_clear(&hcfg);
+ hcfg.comp = mrp_string_comp;
+ hcfg.hash = mrp_string_hash;
+ hcfg.free = purge_watch_table_cb;
+
+ pdp->watched = mrp_htbl_create(&hcfg);
+ }
+
+ return (pdp->watched != NULL);
+}
+
+
+void destroy_tables(pdp_t *pdp)
+{
+ close_db(pdp);
+ mrp_htbl_destroy(pdp->watched, TRUE);
+
+ pdp->watched = NULL;
+}
+
+
+int create_proxy_table(pep_table_t *t, mrp_pep_table_t *def,
+ int *errcode, const char **errmsg)
+{
+ mqi_column_def_t **cols;
+ mqi_column_desc_t **desc;
+ int *ncol;
+ char *index[2];
+
+ if (mqi_get_table_handle((char *)def->name) != MQI_HANDLE_INVALID)
+ FAIL(EEXIST, "table already exists");
+
+ if (def->idx_col >= def->ncolumn)
+ FAIL(EINVAL, "invalid index column specified");
+
+ mrp_list_init(&t->hook);
+ mrp_list_init(&t->watches);
+
+ t->name = mrp_strdup(def->name);
+
+ if (t->name != NULL) {
+ cols = &t->columns;
+ ncol = &t->ncolumn;
+ desc = &t->coldesc;
+
+ if (!copy_column_definitions(def->columns, def->ncolumn, cols, ncol))
+ FAIL(ENOMEM, "failed to create table columns");
+
+ if (!setup_column_descriptors(t->columns, t->ncolumn, desc))
+ FAIL(ENOMEM, "failed to create table descriptor");
+
+ t->h = mqi_create_table(t->name, MQI_TEMPORARY, NULL, t->columns);
+
+ if (t->h != MQI_HANDLE_INVALID) {
+ if (def->idx_col >= 0) {
+ index[0] = (char *)def->columns[def->idx_col].name;
+ index[1] = NULL;
+
+ if (mqi_create_index(t->h, index) != 0)
+ FAIL(EINVAL, "failed to create table index");
+ }
+
+ mrp_debug("create table %s", t->name);
+
+ return TRUE;
+ }
+ else
+ FAIL(EINVAL, "failed to create table");
+ }
+ else
+ FAIL(ENOMEM, "failed to create table");
+
+ fail:
+ return FALSE;
+}
+
+
+void destroy_proxy_table(pep_table_t *t)
+{
+ mrp_debug("destroying table %s", t->name ? t->name : "<unknown>");
+
+ if (t->h != MQI_HANDLE_INVALID)
+ mqi_drop_table(t->h);
+
+ free_column_definitions(t->columns, t->ncolumn);
+ free_column_descriptors(t->coldesc);
+
+ mrp_free(t->name);
+
+ t->name = NULL;
+ t->h = MQI_HANDLE_INVALID;
+ t->columns = NULL;
+ t->ncolumn = 0;
+}
+
+
+void destroy_proxy_tables(pep_proxy_t *proxy)
+{
+ int i;
+
+ mrp_debug("destroying tables of client %s", proxy->name);
+
+ for (i = 0; i < proxy->ntable; i++)
+ destroy_proxy_table(proxy->tables + i);
+
+ proxy->tables = NULL;
+ proxy->ntable = 0;
+}
+
+
+pep_table_t *create_watch_table(pdp_t *pdp, const char *name,
+ mqi_column_def_t *columns, int ncolumn)
+{
+ pep_table_t *t;
+ mqi_column_def_t **cols;
+ mqi_column_desc_t **desc;
+ int *ncol;
+
+ t = mrp_allocz(sizeof(*t));
+
+ if (t != NULL) {
+ mrp_list_init(&t->hook);
+ mrp_list_init(&t->watches);
+
+ t->name = mrp_strdup(name);
+
+ if (t->name == NULL)
+ goto fail;
+
+ cols = &t->columns;
+ ncol = &t->ncolumn;
+ desc = &t->coldesc;
+
+ if (!copy_column_definitions(columns, ncolumn, cols, ncol))
+ goto fail;
+
+ if (!setup_column_descriptors(t->columns, t->ncolumn, desc))
+ goto fail;
+
+ t->h = mqi_get_table_handle(t->name);
+
+ if (!mrp_htbl_insert(pdp->watched, t->name, t))
+ goto fail;
+
+ mrp_list_append(&pdp->tables, &t->hook);
+ }
+
+ return t;
+
+ fail:
+ destroy_watch_table(pdp, t);
+
+ return FALSE;
+}
+
+
+static void destroy_table_watches(pep_table_t *t)
+{
+ pep_watch_t *w;
+ mrp_list_hook_t *p, *n;
+
+ if (t != NULL) {
+ mrp_list_foreach(&t->watches, p, n) {
+ w = mrp_list_entry(p, typeof(*w), tbl_hook);
+
+ mrp_list_delete(&w->tbl_hook);
+ mrp_list_delete(&w->pep_hook);
+
+ mrp_free(w);
+ }
+ }
+}
+
+
+void destroy_watch_table(pdp_t *pdp, pep_table_t *t)
+{
+ mrp_list_delete(&t->hook);
+ t->h = MQI_HANDLE_INVALID;
+
+ if (pdp != NULL)
+ mrp_htbl_remove(pdp->watched, t->name, FALSE);
+
+ destroy_table_watches(t);
+}
+
+
+static pep_table_t *lookup_watch_table(pdp_t *pdp, const char *name)
+{
+ return mrp_htbl_lookup(pdp->watched, (void *)name);
+}
+
+
+static void purge_watch_table_cb(void *key, void *entry)
+{
+ pep_table_t *t = (pep_table_t *)entry;
+
+ MRP_UNUSED(key);
+
+ destroy_watch_table(NULL, t);
+}
+
+
+int create_proxy_watch(pep_proxy_t *proxy, int id, mrp_pep_table_t *def,
+ int *error, const char **errmsg)
+{
+ pdp_t *pdp = proxy->pdp;
+ pep_table_t *t;
+ pep_watch_t *w;
+
+ t = lookup_watch_table(pdp, def->name);
+
+ if (t == NULL) {
+ t = create_watch_table(pdp, def->name, def->columns, def->ncolumn);
+
+ if (t == NULL) {
+ *error = EINVAL;
+ *errmsg = "failed to watch table";
+ }
+ }
+ else {
+ if (!check_columns(t->columns, t->ncolumn, def->columns, def->ncolumn)){
+ *error = EINVAL;
+ *errmsg = "table columns don't match";
+ t = NULL;
+ }
+ }
+
+ if (t == NULL)
+ return FALSE;
+
+ w = mrp_allocz(sizeof(*w));
+
+ if (w != NULL) {
+ mrp_list_init(&w->tbl_hook);
+ mrp_list_init(&w->pep_hook);
+
+ w->table = t;
+ w->proxy = proxy;
+ w->id = id;
+
+ mrp_list_append(&t->watches, &w->tbl_hook);
+ mrp_list_append(&proxy->watches, &w->pep_hook);
+
+ return TRUE;
+ }
+ else {
+ *error = ENOMEM;
+ *errmsg = "failed to allocate table watch";
+ }
+
+ return FALSE;
+}
+
+
+void destroy_proxy_watches(pep_proxy_t *proxy)
+{
+ pep_watch_t *w;
+ mrp_list_hook_t *p, *n;
+
+ if (proxy != NULL) {
+ mrp_list_foreach(&proxy->watches, p, n) {
+ w = mrp_list_entry(p, typeof(*w), pep_hook);
+
+ mrp_list_delete(&w->tbl_hook);
+ mrp_list_delete(&w->pep_hook);
+
+ mrp_free(w);
+ }
+ }
+}
+
+
+static void reset_proxy_tables(pep_proxy_t *proxy)
+{
+ int i;
+
+ for (i = 0; i < proxy->ntable; i++)
+ mqi_delete_from(proxy->tables[i].h, NULL);
+}
+
+
+static int insert_into_table(pep_table_t *t, mrp_pep_value_t *values, int nrow)
+{
+ int i;
+ void *data[2];
+
+ data[1] = NULL;
+
+ for (i = 0; i < nrow; i++) {
+ data[0] = values;
+ if (mqi_insert_into(t->h, 0, t->coldesc, data) != 1)
+ return FALSE;
+ else
+ values += t->ncolumn;
+ }
+
+ return TRUE;
+}
+
+
+int set_proxy_tables(pep_proxy_t *proxy, mrp_pep_data_t *tables, int ntable,
+ int *error, const char **errmsg)
+{
+ mqi_handle_t tx;
+ pep_table_t *t;
+ int i, id;
+
+ tx = mqi_begin_transaction();
+
+ if (tx != MQI_HANDLE_INVALID) {
+ reset_proxy_tables(proxy);
+
+ for (i = 0; i < ntable; i++) {
+ id = tables[i].id;
+
+ if (id < 0 || id >= proxy->ntable)
+ goto fail;
+
+ t = proxy->tables + id;
+
+ if (tables[i].ncolumn != t->ncolumn)
+ goto fail;
+
+#if 0
+ if (!delete_from_table(t, tables[i].columns, tables[i].nrow))
+ goto fail;
+#endif
+
+ if (!insert_into_table(t, tables[i].columns, tables[i].nrow))
+ goto fail;
+
+
+ }
+
+ mqi_commit_transaction(tx);
+
+ return TRUE;
+
+ fail:
+ *error = EINVAL;
+ *errmsg = "failed to set tables";
+ mqi_rollback_transaction(tx);
+ }
+
+ return FALSE;
+}
--- /dev/null
+#ifndef __MURPHY_DECISION_TABLE_H__
+#define __MURPHY_DECISION_TABLE_H__
+
+#include "client.h"
+#include "decision-types.h"
+
+int copy_pep_table(mrp_pep_table_t *src, mrp_pep_table_t *dst);
+void free_pep_table(mrp_pep_table_t *t);
+
+int copy_pep_tables(mrp_pep_table_t *src, mrp_pep_table_t *dst, int n);
+void free_pep_tables(mrp_pep_table_t *tables, int n);
+
+int init_tables(pdp_t *pdp);
+void destroy_tables(pdp_t *pdp);
+
+int create_proxy_table(pep_table_t *t, mrp_pep_table_t *def,
+ int *errcode, const char **errmsg);
+
+int create_proxy_watch(pep_proxy_t *proxy, int id, mrp_pep_table_t *def,
+ int *errcode, const char **errmsg);
+
+void destroy_watch_table(pdp_t *pdp, pep_table_t *t);
+
+void destroy_proxy_table(pep_table_t *t);
+void destroy_proxy_tables(pep_proxy_t *proxy);
+
+void destroy_proxy_watches(pep_proxy_t *proxy);
+
+int set_proxy_tables(pep_proxy_t *proxy, mrp_pep_data_t *tables, int ntable,
+ int *error, const char **errmsg);
+
+#endif /* __MURPHY_DECISION_TABLE_H__ */
--- /dev/null
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <netdb.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#define _GNU_SOURCE
+#include <getopt.h>
+
+#include <readline/readline.h>
+#include <readline/history.h>
+
+#include <murphy/common.h>
+#include "client.h"
+
+#define DEFAULT_PROMPT "test-pep> "
+
+
+/*
+ * client context
+ */
+
+typedef struct {
+ const char *addrstr; /* server address */
+ int zone; /* run in zone control mode */
+ int verbose; /* verbose mode */
+
+ mrp_mainloop_t *ml; /* murphy mainloop */
+ void *pep; /* enforcement point client */
+ int fd; /* fd for terminal input */
+ mrp_io_watch_t *iow; /* I/O watch for terminal input */
+} client_t;
+
+
+#define NVALUE 512
+
+
+/*
+ * device and stream definitions
+ */
+
+#define NDEVICE (MRP_ARRAY_SIZE(devices) - 1)
+#define DEVICE_NCOLUMN 4
+
+typedef struct {
+ const char *name;
+ const char *type;
+ int public;
+ int available;
+} device_t;
+
+static device_t devices[] = {
+ { "builtin-speaker" , "speaker" , TRUE , TRUE },
+ { "builtin-earpiece", "speaker" , FALSE, TRUE },
+ { "usb-speaker" , "speaker" , TRUE , FALSE },
+ { "a2dp-speaker" , "speaker" , TRUE , FALSE },
+ { "wired-headset" , "headset" , FALSE, FALSE },
+ { "usb-headphone" , "headphone", FALSE, FALSE },
+ { "a2dp-headphone" , "headphone", FALSE, FALSE },
+ { "sco-headset" , "headset" , FALSE, FALSE },
+ { NULL , NULL , FALSE, FALSE }
+};
+
+#define NSTREAM (MRP_ARRAY_SIZE(streams) - 1)
+#define STREAM_NCOLUMN 4
+
+typedef struct {
+ const char *name;
+ const char *role;
+ pid_t owner;
+ int playing;
+} stream_t;
+
+static stream_t streams[] = {
+ { "player1", "player" , 1234, FALSE },
+ { "player2", "player" , 4321, FALSE },
+ { "navit" , "navigator", 5432, FALSE },
+ { "phone" , "call" , 6666, FALSE },
+ { NULL , NULL , 0 , FALSE }
+};
+
+
+/*
+ * device and stream descriptors
+ */
+
+MRP_PEP_TABLE_COLUMNS(device_columns,
+ MRP_PEP_STRING ("name", 32 , TRUE ),
+ MRP_PEP_STRING ("type", 32 , FALSE),
+ MRP_PEP_INTEGER("public" , FALSE),
+ MRP_PEP_INTEGER("available", FALSE));
+
+MRP_PEP_TABLE_COLUMNS(stream_columns,
+ MRP_PEP_STRING ("name", 32, TRUE ),
+ MRP_PEP_STRING ("role", 32, FALSE),
+ MRP_PEP_UNSIGNED("owner" , FALSE),
+ MRP_PEP_INTEGER ("playing" , FALSE));
+
+MRP_PEP_TABLES(media_tables,
+ MRP_PEP_TABLE("devices", device_columns),
+ MRP_PEP_TABLE("streams", stream_columns));
+
+
+/*
+ * zone and call definitions
+ */
+
+#define NZONE (MRP_ARRAY_SIZE(zones) - 1)
+#define ZONE_NCOLUMN 3
+
+typedef struct {
+ const char *name;
+ int occupied;
+ int active;
+} zone_t;
+
+static zone_t zones[] = {
+ { "driver" , TRUE , FALSE },
+ { "fearer" , FALSE, TRUE },
+ { "back-left" , TRUE , FALSE },
+ { "back-center", FALSE, FALSE },
+ { "back-right" , TRUE , TRUE },
+ { NULL , FALSE, FALSE }
+};
+
+
+#define NCALL (MRP_ARRAY_SIZE(calls) - 1)
+#define CALL_NCOLUMN 3
+
+typedef struct {
+ int id;
+ const char *state;
+ const char *modem;
+} call_t;
+
+static call_t calls[] = {
+ { 1, "active" , "modem1" },
+ { 2, "ringing" , "modem1" },
+ { 3, "held" , "modem2" },
+ { 4, "alerting", "modem2" },
+ { 0, NULL , NULL }
+};
+
+
+/*
+ * zone and call descriptors
+ */
+
+MRP_PEP_TABLE_COLUMNS(zone_columns,
+ MRP_PEP_STRING ("name", 32, TRUE),
+ MRP_PEP_INTEGER("occupied", FALSE),
+ MRP_PEP_INTEGER("active" , FALSE));
+
+MRP_PEP_TABLE_COLUMNS(call_columns,
+ MRP_PEP_INTEGER("id" , TRUE ),
+ MRP_PEP_STRING ("state", 32, FALSE),
+ MRP_PEP_STRING ("modem", 32, FALSE));
+
+MRP_PEP_TABLES(zone_tables,
+ MRP_PEP_TABLE("zones", zone_columns),
+ MRP_PEP_TABLE("calls", call_columns));
+
+
+
+mrp_pep_table_t *exports;
+int nexport;
+mrp_pep_table_t *imports;
+int nimport;
+
+
+static client_t *client;
+
+
+
+static void fatal_msg(int error, const char *format, ...);
+static void error_msg(const char *format, ...);
+static void info_msg(const char *format, ...);
+
+static void terminal_input_cb(char *input);
+
+static void export_data(client_t *c);
+
+
+static void plug_device(client_t *c, const char *name, int plug)
+{
+ device_t *d;
+ int changed;
+
+ if (c->zone) {
+ error_msg("cannot plug/unplug, client is in zone mode");
+ return;
+ }
+
+ changed = FALSE;
+
+ for (d = devices; d->name != NULL; d++) {
+ if (!strcmp(d->name, name)) {
+ changed = plug ^ d->available;
+ d->available = plug;
+ break;
+ }
+ }
+
+ if (changed) {
+ info_msg("device '%s' is now %splugged", d->name, plug ? "" : "un");
+ export_data(c);
+ }
+}
+
+
+static void list_devices(void)
+{
+ device_t *d;
+ int n;
+
+ for (d = devices, n = 0; d->name != NULL; d++, n++) {
+ info_msg("device '%s': (%s, %s), %s",
+ d->name, d->type, d->public ? "public" : "private",
+ d->available ? "available" : "currently unplugged");
+ }
+
+ if (n == 0)
+ info_msg("devices: none");
+}
+
+
+static void play_stream(client_t *c, const char *name, int play)
+{
+ stream_t *s;
+ int changed;
+
+ if (c->zone) {
+ error_msg("cannot control streams, client is in zone mode");
+ return;
+ }
+
+ changed = FALSE;
+
+ for (s = streams; s->name != NULL; s++) {
+ if (!strcmp(s->name, name)) {
+ changed = play ^ s->playing;
+ s->playing = play;
+ break;
+ }
+ }
+
+ if (changed) {
+ info_msg("stream '%s' is now %s", s->name, play ? "playing":"stopped");
+ export_data(c);
+ }
+}
+
+
+static void list_streams(void)
+{
+ stream_t *s;
+ int n;
+
+ for (s = streams, n = 0; s->name != NULL; s++, n++) {
+ info_msg("stream '%s': role %s, owner %u, currently %splaying",
+ s->name, s->role, s->owner, s->playing ? "" : "not ");
+ }
+
+ if (n == 0)
+ info_msg("streams: none");
+}
+
+
+static void set_zone_state(client_t *c, char *config)
+{
+ zone_t *z;
+ int occupied, active, changed, len;
+ char name[256], *end;
+
+ if (!c->zone) {
+ error_msg("cannot control zones, client is not in zone mode");
+ return;
+ }
+
+ while (*config == ' ' || *config == '\t')
+ config++;
+
+ end = strchr(config, ' ');
+ if (end == NULL)
+ return;
+
+ len = end - config;
+ strncpy(name, config, len);
+ name[len] = '\0';
+
+ config = end + 1;
+ while (*config == ' ' || *config == '\t')
+ config++;
+
+ occupied = FALSE;
+ active = FALSE;
+ changed = FALSE;
+
+ if (strstr(config, "occupied"))
+ occupied = TRUE;
+ if (strstr(config, "active"))
+ active = TRUE;
+
+ for (z = zones; z->name != NULL; z++) {
+ if (!strcmp(z->name, name)) {
+ changed = (active & z->active) | (occupied ^ z->occupied);
+ z->active = active;
+ z->occupied = occupied;
+ break;
+ }
+ }
+
+ if (changed) {
+ info_msg("zone '%s' is now %s and %s", z->name,
+ z->occupied ? "occupied" : "free",
+ z->active ? "actvie" : "idle");
+ export_data(c);
+ }
+}
+
+
+static void list_zones(void)
+{
+ zone_t *z;
+ int n;
+
+ for (z = zones, n = 0; z->name != NULL; z++, n++) {
+ info_msg("zone '%s' is now %s and %s", z->name,
+ z->occupied ? "occupied" : "free",
+ z->active ? "actvie" : "idle");
+ }
+
+ if (n == 0)
+ info_msg("zones: none");
+}
+
+
+static void set_call_state(client_t *c, const char *config)
+{
+ call_t *call;
+ char idstr[64], *state, *end;
+ int id, changed, len;
+
+ if (!c->zone) {
+ error_msg("cannot control calls, client is not in zone mode");
+ return;
+ }
+
+ while (*config == ' ' || *config == '\t')
+ config++;
+
+ end = strchr(config, ' ');
+ if (end == NULL)
+ return;
+
+ len = end - config;
+ strncpy(idstr, config, len);
+ idstr[len] = '\0';
+
+ config = end + 1;
+ while (*config == ' ' || *config == '\t')
+ config++;
+ state = (char *)config;
+
+ id = strtoul(idstr, &end, 10);
+
+ if (end && *end) {
+ error_msg("invalid call id '%s'", idstr);
+ return;
+ }
+
+ for (call = calls; call->id > 0; call++) {
+ if (call->id == id) {
+ if (strcmp(call->state, state)) {
+ mrp_free((char *)call->state);
+ call->state = mrp_strdup(state);
+ changed = TRUE;
+ break;
+ }
+ }
+ }
+
+ if (changed) {
+ info_msg("call #%d is now %s", call->id, call->state);
+ export_data(c);
+ }
+}
+
+
+static void list_calls(void)
+{
+ call_t *c;
+ int n;
+
+ for (c = calls, n = 0; c->id > 0; c++, n++) {
+ info_msg("call #%d: %s (on modem %s)", c->id, c->state, c->modem);
+ }
+
+ if (n == 0)
+ info_msg("calls: none");
+}
+
+
+static void reset_devices(void)
+{
+ mrp_clear(&devices);
+}
+
+
+void update_devices(mrp_pep_data_t *data)
+{
+ device_t *d;
+ mrp_pep_value_t *v;
+ int i;
+
+ if (data->ncolumn != DEVICE_NCOLUMN) {
+ error_msg("incorrect number of columns (%d) in device update",
+ data->ncolumn);
+ return;
+ }
+
+ if (data->nrow > (int)NDEVICE) {
+ error_msg("too many rows (%d) in device update", data->nrow);
+ return;
+ }
+
+ if (data->nrow == 0)
+ reset_devices();
+ else {
+ v = data->columns;
+ d = devices;
+
+ for (i = 0; i < data->nrow; i++) {
+ mrp_free((char *)d->name);
+ mrp_free((char *)d->type);
+
+ d->name = mrp_strdup(v[0].str);
+ d->type = mrp_strdup(v[1].str);
+ d->public = v[2].s32;
+ d->available = v[3].s32;
+
+ v += 4;
+ d += 1;
+ }
+ }
+
+ list_devices();
+}
+
+
+static void reset_streams(void)
+{
+ mrp_clear(&streams);
+}
+
+
+void update_streams(mrp_pep_data_t *data)
+{
+ stream_t *s;
+ mrp_pep_value_t *v;
+ int i;
+
+ if (data->ncolumn != STREAM_NCOLUMN) {
+ error_msg("incorrect number of columns (%d) in stream update",
+ data->ncolumn);
+ return;
+ }
+
+ if (data->nrow > (int)NSTREAM) {
+ error_msg("too many rows (%d) in stream update", data->nrow);
+ return;
+ }
+
+ if (data->nrow == 0)
+ reset_streams();
+ else {
+ v = data->columns;
+ s = streams;
+
+ for (i = 0; i < data->nrow; i++) {
+ mrp_free((char *)s->name);
+ mrp_free((char *)s->role);
+
+ s->name = mrp_strdup(v[0].str);
+ s->role = mrp_strdup(v[1].str);
+ s->owner = v[2].u32;
+ s->playing = v[3].s32;
+
+ v += 4;
+ s += 1;
+ }
+ }
+
+ list_streams();
+}
+
+static void reset_zones(void)
+{
+ mrp_clear(&zones);
+}
+
+
+void update_zones(mrp_pep_data_t *data)
+{
+ zone_t *z;
+ mrp_pep_value_t *v;
+ int i;
+
+ if (data->ncolumn != ZONE_NCOLUMN) {
+ error_msg("incorrect number of columns (%d) in zone update",
+ data->ncolumn);
+ return;
+ }
+
+ if (data->nrow > (int)NZONE) {
+ error_msg("too many rows (%d) in zone update", data->nrow);
+ return;
+ }
+
+ if (data->nrow == 0)
+ reset_zones();
+ else {
+
+ v = data->columns;
+ z = zones;
+
+ for (i = 0; i < data->nrow; i++) {
+ mrp_free((char *)z->name);
+
+ z->name = mrp_strdup(v[0].str);
+ z->occupied = v[1].s32;
+ z->active = v[2].s32;
+
+ v += 3;
+ z += 1;
+ }
+ }
+
+ list_zones();
+}
+
+
+static void reset_calls(void)
+{
+ mrp_clear(&calls);
+}
+
+
+void update_calls(mrp_pep_data_t *data)
+{
+ call_t *c;
+ mrp_pep_value_t *v;
+ int i;
+
+ if (data->ncolumn != CALL_NCOLUMN) {
+ error_msg("incorrect number of columns (%d) in call update.",
+ data->ncolumn);
+ return;
+ }
+
+ if (data->nrow > (int)NCALL) {
+ error_msg("too many rows (%d) in call update", data->nrow);
+ return;
+ }
+
+ if (data->nrow == 0)
+ reset_calls();
+ else {
+ v = data->columns;
+ c = calls;
+
+ for (i = 0; i < data->nrow; i++) {
+ mrp_free((char *)c->state);
+ mrp_free((char *)c->modem);
+
+ c->id = v[0].s32;
+ c->state = mrp_strdup(v[1].str);
+ c->modem = mrp_strdup(v[2].str);
+
+ v += 3;
+ c += 1;
+ }
+ }
+
+ list_calls();
+}
+
+
+void update_imports(client_t *c, mrp_pep_data_t *data, int ntable)
+{
+ int i;
+
+ for (i = 0; i < ntable; i++) {
+ if (c->zone) {
+ if (data[i].id == 0)
+ update_devices(data + i);
+ else
+ update_streams(data + i);
+ }
+ else {
+ if (data[i].id == 0)
+ update_zones(data + i);
+ else
+ update_calls(data + i);
+ }
+ }
+}
+
+
+
+static void terminal_prompt_erase(void)
+{
+ int n = strlen(DEFAULT_PROMPT);
+
+ printf("\r");
+ while (n-- > 0)
+ printf(" ");
+ printf("\r");
+}
+
+
+static void terminal_prompt_display(void)
+{
+ rl_callback_handler_remove();
+ rl_callback_handler_install(DEFAULT_PROMPT, terminal_input_cb);
+}
+
+
+static void show_help(void)
+{
+#define P info_msg
+
+ P("Available commands:");
+ P(" help show this help");
+ P(" list list all data");
+ P(" list {devices|streams|zones|calls} list the requested data");
+ P(" plug <device> update <device> as plugged");
+ P(" unplug <device> update <device> as unplugged");
+ P(" play <stream> update <stream> as playing");
+ P(" stop <stream> update <stream> as stopped");
+ P(" call <call> <state> update state of <call>");
+ P(" zone <zone> [occupied,[active]] update state of <zone>");
+
+#undef P
+}
+
+
+static void terminal_process_input(char *input)
+{
+ int len;
+
+ add_history(input);
+
+ if (input == NULL || !strcmp(input, "exit")) {
+ terminal_prompt_erase();
+ exit(0);
+ }
+ else if (!strcmp(input, "help")) {
+ show_help();
+ }
+ else if (!strcmp(input, "list")) {
+ list_devices();
+ list_streams();
+ list_zones();
+ list_calls();
+ }
+ else if (!strcmp(input, "list devices"))
+ list_devices();
+ else if (!strcmp(input, "list streams"))
+ list_streams();
+ else if (!strcmp(input, "list zones"))
+ list_zones();
+ else if (!strcmp(input, "list calls"))
+ list_calls();
+ else if (!strncmp(input, "plug " , len=sizeof("plug ") - 1) ||
+ !strncmp(input, "unplug ", len=sizeof("unplug ") - 1)) {
+ plug_device(client, input + len, *input == 'p');
+ }
+ else if (!strncmp(input, "play " , len=sizeof("play ") - 1) ||
+ !strncmp(input, "stop ", len=sizeof("stop ") - 1)) {
+ play_stream(client, input + len, *input == 'p');
+ }
+ else if (!strncmp(input, "call " , len=sizeof("call ") - 1)) {
+ set_call_state(client, input + len);
+ }
+ else if (!strncmp(input, "zone " , len=sizeof("zone ") - 1)) {
+ set_zone_state(client, input + len);
+ }
+}
+
+
+static void terminal_input_cb(char *input)
+{
+ terminal_process_input(input);
+ free(input);
+}
+
+
+static void terminal_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd,
+ mrp_io_event_t events, void *user_data)
+{
+ MRP_UNUSED(w);
+ MRP_UNUSED(fd);
+ MRP_UNUSED(user_data);
+
+ if (events & MRP_IO_EVENT_IN)
+ rl_callback_read_char();
+
+ if (events & MRP_IO_EVENT_HUP)
+ mrp_mainloop_quit(ml, 0);
+}
+
+
+static void terminal_setup(client_t *c)
+{
+ mrp_io_event_t events;
+
+ c->fd = fileno(stdin);
+ events = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP;
+ c->iow = mrp_add_io_watch(c->ml, c->fd, events, terminal_cb, c);
+
+ if (c->iow == NULL)
+ fatal_msg(1, "Failed to create terminal input I/O watch.");
+ else
+ terminal_prompt_display();
+}
+
+
+static void terminal_cleanup(client_t *c)
+{
+ mrp_del_io_watch(c->iow);
+ c->iow = NULL;
+
+ rl_callback_handler_remove();
+}
+
+
+static void fatal_msg(int error, const char *format, ...)
+{
+ va_list ap;
+
+ terminal_prompt_erase();
+
+ fprintf(stderr, "fatal error: ");
+ va_start(ap, format);
+ vfprintf(stderr, format, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ fflush(stderr);
+
+ exit(error);
+}
+
+
+static void error_msg(const char *format, ...)
+{
+ va_list ap;
+
+ terminal_prompt_erase();
+
+ fprintf(stderr, "error: ");
+ va_start(ap, format);
+ vfprintf(stderr, format, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ fflush(stderr);
+
+ terminal_prompt_display();
+}
+
+
+static void info_msg(const char *format, ...)
+{
+ va_list ap;
+
+ terminal_prompt_erase();
+
+ va_start(ap, format);
+ vfprintf(stdout, format, ap);
+ va_end(ap);
+ fprintf(stdout, "\n");
+ fflush(stdout);
+
+ terminal_prompt_display();
+}
+
+
+static void signal_handler(mrp_mainloop_t *ml, mrp_sighandler_t *h,
+ int signum, void *user_data)
+{
+ MRP_UNUSED(h);
+ MRP_UNUSED(user_data);
+
+ switch (signum) {
+ case SIGINT:
+ info_msg("Got SIGINT, stopping...");
+ mrp_mainloop_quit(ml, 0);
+ break;
+ }
+}
+
+
+static void connect_notify(mrp_pep_t *pep, int connected, int errcode,
+ const char *errmsg, void *user_data)
+{
+ MRP_UNUSED(pep);
+ MRP_UNUSED(user_data);
+
+ if (connected) {
+ info_msg("Successfully registered to server.");
+ export_data(client);
+ }
+ else
+ error_msg("No connection to server (%d: %s).", errcode, errmsg);
+}
+
+
+static void data_notify(mrp_pep_t *pep, mrp_pep_data_t *tables,
+ int ntable, void *user_data)
+{
+ client_t *client = (client_t *)user_data;
+
+ MRP_UNUSED(pep);
+
+ update_imports(client, tables, ntable);
+}
+
+
+static void export_notify(mrp_pep_t *pep, int errcode, const char *errmsg,
+ void *user_data)
+{
+ MRP_UNUSED(pep);
+ MRP_UNUSED(user_data);
+
+ if (errcode != 0) {
+ error_msg("Data set request failed (%d: %s).", errcode, errmsg);
+ }
+}
+
+
+static void export_data(client_t *c)
+{
+ mrp_pep_value_t values[NVALUE], *col, *val;
+ mrp_pep_data_t tables[2];
+ int i;
+
+ val = values;
+ col = val;
+
+ if (!c->zone) {
+ tables[0].id = 0;
+ tables[0].nrow = NDEVICE;
+ tables[0].columns = col;
+
+ for (i = 0; i < (int)NDEVICE; i++) {
+ col[0].str = devices[i].name;
+ col[1].str = devices[i].type;
+ col[2].s32 = devices[i].public;
+ col[3].s32 = devices[i].available;
+ col += 4;
+ }
+
+ tables[1].id = 1;
+ tables[1].nrow = NSTREAM;
+ tables[1].columns = col;
+
+ for (i = 0; i < (int)NSTREAM; i++) {
+ col[0].str = streams[i].name;
+ col[1].str = streams[i].role;
+ col[2].u32 = streams[i].owner;
+ col[3].s32 = streams[i].playing;
+ col += 4;
+ }
+ }
+ else {
+ tables[0].id = 0;
+ tables[0].nrow = NZONE;
+ tables[0].columns = col;
+
+ for (i = 0; i < (int)NZONE; i++) {
+ col[0].str = zones[i].name;
+ col[1].s32 = zones[i].occupied;
+ col[2].s32 = zones[i].active;
+ col += 3;
+ }
+
+ tables[1].id = 1;
+ tables[1].nrow = NCALL;
+ tables[1].columns = col;
+
+ for (i = 0; i < (int)NCALL; i++) {
+ col[0].s32 = calls[i].id;
+ col[1].str = calls[i].state;
+ col[2].str = calls[i].modem;
+ col += 3;
+ }
+ }
+
+ if (!mrp_pep_set_data(c->pep, tables, MRP_ARRAY_SIZE(tables),
+ export_notify, c))
+ error_msg("Failed to send data set request to server.");
+}
+
+
+static void client_setup(client_t *c)
+{
+ mrp_mainloop_t *ml;
+ mrp_pep_t *pep;
+
+ ml = mrp_mainloop_create();
+
+ if (ml != NULL) {
+ if (!c->zone) {
+ exports = media_tables;
+ nexport = MRP_ARRAY_SIZE(media_tables);
+ imports = zone_tables;
+ nimport = MRP_ARRAY_SIZE(zone_tables);
+ }
+ else {
+ exports = zone_tables;
+ nexport = MRP_ARRAY_SIZE(zone_tables);
+ imports = media_tables;
+ nimport = MRP_ARRAY_SIZE(media_tables);
+ }
+
+ pep = mrp_pep_create(c->zone ? "zone-pep" : "media-pep", ml,
+ exports, nexport, imports, nimport,
+ connect_notify, data_notify, c);
+
+ if (pep != NULL) {
+ c->ml = ml;
+ c->pep = pep;
+
+ mrp_add_sighandler(ml, SIGINT, signal_handler, c);
+
+ if (c->zone) {
+ zone_t *z;
+ call_t *call;
+
+ for (z = zones; z->name != NULL; z++) {
+ z->name = mrp_strdup(z->name);
+ }
+
+ for (call = calls; call->id > 0; call++) {
+ call->state = mrp_strdup(call->state);
+ call->modem = mrp_strdup(call->modem);
+ }
+
+ reset_devices();
+ reset_streams();
+ }
+ else {
+ device_t *d;
+ stream_t *s;
+
+ for (d = devices; d->name != NULL; d++) {
+ d->name = mrp_strdup(d->name);
+ d->type = mrp_strdup(d->type);
+ }
+
+ for (s = streams; s->name != NULL; s++) {
+ s->name = mrp_strdup(s->name);
+ s->role = mrp_strdup(s->role);
+ }
+
+ reset_zones();
+ reset_calls();
+ }
+ }
+ else
+ fatal_msg(1, "Failed to create enforcement point.");
+ }
+ else
+ fatal_msg(1, "Failed to create mainloop.");
+}
+
+
+static void client_cleanup(client_t *c)
+{
+ mrp_mainloop_destroy(c->ml);
+ mrp_pep_destroy(c->pep);
+
+ c->ml = NULL;
+ c->pep = NULL;
+}
+
+
+static void client_run(client_t *c)
+{
+ if (mrp_pep_connect(c->pep, c->addrstr))
+ info_msg("Connected to server at %s.", c->addrstr);
+ else
+ error_msg("Failed to connect to server at %s.", c->addrstr);
+
+ mrp_mainloop_run(c->ml);
+}
+
+
+static void print_usage(const char *argv0, int exit_code, const char *fmt, ...)
+{
+ va_list ap;
+
+ if (fmt && *fmt) {
+ va_start(ap, fmt);
+ vprintf(fmt, ap);
+ va_end(ap);
+ }
+
+ printf("usage: %s [options]\n\n"
+ "The possible options are:\n"
+ " -s, --server <address> connect to murphy at given address\n"
+ " -z, --zone run as zone controller\n"
+ " -v, --verbose run in verbose mode\n"
+ " -h, --help show this help on usage\n",
+ argv0);
+
+ if (exit_code < 0)
+ return;
+ else
+ exit(exit_code);
+}
+
+
+static void client_set_defaults(client_t *c)
+{
+ mrp_clear(c);
+ c->addrstr = MRP_DEFAULT_PEP_ADDRESS;
+ c->zone = FALSE;
+ c->verbose = FALSE;
+}
+
+
+int parse_cmdline(client_t *c, int argc, char **argv)
+{
+# define OPTIONS "vzhs:"
+ struct option options[] = {
+ { "server" , required_argument, NULL, 's' },
+ { "zone" , no_argument , NULL, 'z' },
+ { "verbose" , optional_argument, NULL, 'v' },
+ { "help" , no_argument , NULL, 'h' },
+ { NULL, 0, NULL, 0 }
+ };
+
+ int opt;
+
+ client_set_defaults(c);
+
+ while ((opt = getopt_long(argc, argv, OPTIONS, options, NULL)) != -1) {
+ switch (opt) {
+ case 'z':
+ c->zone = TRUE;
+ break;
+
+ case 'v':
+ c->verbose = TRUE;
+ break;
+
+ case 'a':
+ c->addrstr = optarg;
+ break;
+
+ case 'h':
+ print_usage(argv[0], -1, "");
+ exit(0);
+ break;
+
+ default:
+ print_usage(argv[0], EINVAL, "invalid option '%c'", opt);
+ }
+ }
+
+ return TRUE;
+}
+
+
+int main(int argc, char *argv[])
+{
+ client_t c;
+
+ client_set_defaults(&c);
+ parse_cmdline(&c, argc, argv);
+
+ client_setup(&c);
+ terminal_setup(&c);
+
+ client = &c;
+ client_run(&c);
+
+ terminal_cleanup(&c);
+ client_cleanup(&c);
+
+ return 0;
+}