From f9f730eb36149506509b931142bba335e90587bc Mon Sep 17 00:00:00 2001 From: Krisztian Litkey Date: Mon, 22 Oct 2012 13:07:00 +0300 Subject: [PATCH] domain-control: added domain-control plugin (modified decision-proto). --- configure.ac | 4 + src/Makefile.am | 85 ++ src/daemon/murphy.conf | 30 +- src/plugins/domain-control/Makefile | 7 + src/plugins/domain-control/client.c | 540 ++++++++++ src/plugins/domain-control/client.h | 117 ++ src/plugins/domain-control/domain-control-types.h | 115 ++ src/plugins/domain-control/domain-control.c | 386 +++++++ src/plugins/domain-control/domain-control.h | 11 + src/plugins/domain-control/message.c | 235 ++++ src/plugins/domain-control/message.h | 100 ++ src/plugins/domain-control/notify.c | 219 ++++ src/plugins/domain-control/notify.h | 8 + src/plugins/domain-control/plugin-domain-control.c | 56 + src/plugins/domain-control/proxy.c | 128 +++ src/plugins/domain-control/proxy.h | 18 + src/plugins/domain-control/table-common.c | 10 + src/plugins/domain-control/table.c | 501 +++++++++ src/plugins/domain-control/table.h | 33 + src/plugins/domain-control/test-client.c | 1135 ++++++++++++++++++++ 20 files changed, 3726 insertions(+), 12 deletions(-) create mode 100644 src/plugins/domain-control/Makefile create mode 100644 src/plugins/domain-control/client.c create mode 100644 src/plugins/domain-control/client.h create mode 100644 src/plugins/domain-control/domain-control-types.h create mode 100644 src/plugins/domain-control/domain-control.c create mode 100644 src/plugins/domain-control/domain-control.h create mode 100644 src/plugins/domain-control/message.c create mode 100644 src/plugins/domain-control/message.h create mode 100644 src/plugins/domain-control/notify.c create mode 100644 src/plugins/domain-control/notify.h create mode 100644 src/plugins/domain-control/plugin-domain-control.c create mode 100644 src/plugins/domain-control/proxy.c create mode 100644 src/plugins/domain-control/proxy.h create mode 100644 src/plugins/domain-control/table-common.c create mode 100644 src/plugins/domain-control/table.c create mode 100644 src/plugins/domain-control/table.h create mode 100644 src/plugins/domain-control/test-client.c diff --git a/configure.ac b/configure.ac index eb3baa7..986812f 100644 --- a/configure.ac +++ b/configure.ac @@ -342,6 +342,8 @@ AM_CONDITIONAL(DISABLED_PLUGIN_CONSOLE, [check_if_disabled console]) AM_CONDITIONAL(DISABLED_PLUGIN_SIGNALLING, [check_if_disabled signalling]) AM_CONDITIONAL(DISABLED_PLUGIN_DECISION, [check_if_disabled decision]) AM_CONDITIONAL(DISABLED_PLUGIN_RESOURCE_DBUS, [check_if_disabled resource-dbus]) +AM_CONDITIONAL(DISABLED_PLUGIN_DOMAIN_CONTROL, + [check_if_disabled domain-control]) AM_CONDITIONAL(BUILTIN_PLUGIN_TEST, [check_if_internal test]) AM_CONDITIONAL(BUILTIN_PLUGIN_DBUS, [check_if_internal dbus]) @@ -350,6 +352,8 @@ AM_CONDITIONAL(BUILTIN_PLUGIN_CONSOLE, [check_if_internal console]) AM_CONDITIONAL(BUILTIN_PLUGIN_SIGNALLING, [check_if_internal signalling]) AM_CONDITIONAL(BUILTIN_PLUGIN_DECISION, [check_if_internal decision]) AM_CONDITIONAL(BUILTIN_PLUGIN_RESOURCE_DBUS, [check_if_internal resource-dbus]) +AM_CONDITIONAL(BUILTIN_PLUGIN_DOMAIN_CONTROL, + [check_if_internal domain-control]) # Check for Check (unit test framework). PKG_CHECK_MODULES(CHECK, diff --git a/src/Makefile.am b/src/Makefile.am index 026d162..db808a5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -794,6 +794,91 @@ test_client_LDADD = libmurphy-pep.la \ -lreadline endif +# domain control plugin +DOMAIN_CONTROL_PLUGIN_SOURCES = plugins/domain-control/plugin-domain-control.c \ + plugins/domain-control/domain-control.c \ + plugins/domain-control/proxy.c \ + plugins/domain-control/table.c \ + plugins/domain-control/message.c \ + plugins/domain-control/notify.c + +DOMAIN_CONTROL_PLUGIN_CFLAGS = +DOMAIN_CONTROL_PLUGIN_LIBS = + +if !DISABLED_PLUGIN_DOMAIN_CONTROL +if BUILTIN_PLUGIN_DOMAIN_CONTROL +LINKEDIN_PLUGINS += libmurphy-plugin-domain-control.la +lib_LTLIBRARIES += libmurphy-plugin-domain-control.la +DOMAIN_CONTROL_PLUGIN_LOADER = linkedin-domain-control-loader.c +DOMAIN_CONTROL_PLUGIN_CFLAGS += $(BUILTIN_CFLAGS) + + +libmurphy_plugin_domain_control_ladir = \ + $(includedir)/murphy/domain-control + +libmurphy_plugin_domain_control_la_SOURCES = \ + $(DOMAIN_CONTROL_PLUGIN_SOURCES) \ + $(DOMAIN_CONTROL_PLUGIN_LOADER) + +libmurphy_plugin_domain_control_la_CFLAGS = \ + $(DOMAIN_CONTROL_PLUGIN_CFLAGS) \ + $(AM_CFLAGS) + +libmurphy_plugin_domain_control_la_LDFLAGS = \ + -Wl,-version-script=linker-script.domain-control \ + -version-info @MURPHY_VERSION_INFO@ + +libmurphy_plugin_domain_control_la_LIBADD = \ + libmurphy-core.la \ + libmurphy-common.la \ + murphy-db/mql/libmql.la \ + murphy-db/mqi/libmqi.la \ + murphy-db/mdb/libmdb.la + +libmurphy_plugin_domain_control_la_DEPENDENCIES = \ + linker-script.domain-control \ + libmurphy-core.la \ + libmurphy-common.la \ + murphy-db/mql/libmql.la \ + murphy-db/mqi/libmqi.la \ + murphy-db/mdb/libmdb.la + +# linkedin domain control plugin linker script generation +linker-script.domain-control: $(DOMAIN_CONTROL_PLUGIN_LOADER:%.c=%.h) + $(QUIET_GEN)$(top_builddir)/build-aux/gen-linker-script -q -o $@ $^ + +clean-linker-script:: + -rm -f linker-script.domain-control +else +plugin_domain_control_la_SOURCES = $(DOMAIN_CONTROL_PLUGIN_SOURCES) +plugin_domain_control_la_CFLAGS = $(DOMAIN_CONTROL_PLUGIN_CFLAGS) \ + $(MURPHY_CFLAGS) $(AM_CFLAGS) +plugin_domain_control_la_LDFLAGS = -module -avoid-version +plugin_domain_control_la_LIBADD = $(DOMAIN_CONTROL_PLUGIN_LIBS) +plugin_LTLIBRARIES += plugin-domain-control.la +endif + +# domain controller client library +lib_LTLIBRARIES += libmurphy-domain-controller.la +libmurphy_domain_controller_la_SOURCES = plugins/domain-control/client.c \ + plugins/domain-control/table-common.c\ + plugins/domain-control/message.c +libmurphy_domain_controller_la_CFLAGS = +libmurphy_domain_controller_la_LIBADD = libmurphy-common.la \ + murphy-db/mql/libmql.la \ + murphy-db/mqi/libmqi.la \ + murphy-db/mdb/libmdb.la + +# test domain controller +bin_PROGRAMS += test-domain-controller + +test_domain_controller_SOURCES = plugins/domain-control/test-client.c +test_domain_controller_CFLAGS = $(AM_CFLAGS) +test_domain_controller_LDADD = libmurphy-domain-controller.la \ + libmurphy-common.la \ + -lreadline +endif + ################################### # murphy daemon # diff --git a/src/daemon/murphy.conf b/src/daemon/murphy.conf index 2499036..46fd7d6 100644 --- a/src/daemon/murphy.conf +++ b/src/daemon/murphy.conf @@ -32,21 +32,27 @@ if plugin-exists murphydb end # load the native resource plugin if it exists -if plugin-exists resource-native - load-plugin resource-native -else - info "Could not find resource-native plugin" -end +#if plugin-exists resource-native +# load-plugin resource-native +#else +# info "Could not find resource-native plugin" +#end # load the enforcement point interface prototype plugin if it exists -if plugin-exists decision-proto - load-plugin decision-proto -else - info "There is no decision-proto plugin available..." -end +#if plugin-exists decision-proto +# load-plugin decision-proto +#else +# info "There is no decision-proto plugin available..." +#end -if plugin-exists resource-dbus - load-plugin resource-dbus +#if plugin-exists resource-dbus +# try-load-plugin resource-dbus +#end + +if plugin-exists domain-control + load-plugin domain-control +else + info "No domain-control plugin found..." end #set resolver-ruleset '/u/src/work/murphy/src/resolver/test-input' diff --git a/src/plugins/domain-control/Makefile b/src/plugins/domain-control/Makefile new file mode 100644 index 0000000..2c0a593 --- /dev/null +++ b/src/plugins/domain-control/Makefile @@ -0,0 +1,7 @@ +ifneq ($(strip $(MAKECMDGOALS)),) +%: + $(MAKE) -C .. $(MAKECMDGOALS) +else +all: + $(MAKE) -C .. all +endif diff --git a/src/plugins/domain-control/client.c b/src/plugins/domain-control/client.c new file mode 100644 index 0000000..c9bbd8d --- /dev/null +++ b/src/plugins/domain-control/client.c @@ -0,0 +1,540 @@ +#include +#include + +#include +#include +#include +#include + +#include "domain-control-types.h" +#include "table.h" +#include "message.h" +#include "client.h" + + +/* + * mark an enforcement point busy (typically while executing a callback) + */ + +#define DOMCTL_MARK_BUSY(dc, ...) do { \ + (dc)->busy++; \ + __VA_ARGS__ \ + (dc)->busy--; \ + check_destroyed(dc); \ + } while (0) + + +/* + * a pending request + */ + +typedef struct { + mrp_list_hook_t hook; /* hook to pending request queue */ + uint32_t seqno; /* sequence number/request id */ + mrp_domctl_status_cb_t cb; /* callback to call upon completion */ + void *user_data; /* opaque callback data */ +} pending_request_t; + + +static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data); +static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg, + mrp_sockaddr_t *addr, socklen_t addrlen, + void *user_data); +static void closed_cb(mrp_transport_t *t, int error, void *user_data); + + +static int queue_pending(mrp_domctl_t *dc, uint32_t seq, + mrp_domctl_status_cb_t cb, void *user_data); +static int notify_pending(mrp_domctl_t *dc, uint32_t seq, int error, + const char *msg); +static void purge_pending(mrp_domctl_t *dc); + + + + +mrp_domctl_t *mrp_domctl_create(const char *name, mrp_mainloop_t *ml, + mrp_domctl_table_t *tables, int ntable, + mrp_domctl_watch_t *watches, int nwatch, + mrp_domctl_connect_cb_t connect_cb, + mrp_domctl_watch_cb_t watch_cb, void *user_data) +{ + mrp_domctl_t *dc; + mrp_domctl_table_t *st, *dt; + mrp_domctl_watch_t *sw, *dw; + int i; + + dc = mrp_allocz(sizeof(*dc)); + + if (dc != NULL) { + mrp_list_init(&dc->pending); + dc->ml = ml; + + dc->name = mrp_strdup(name); + dc->tables = mrp_allocz_array(typeof(*dc->tables) , ntable); + dc->watches = mrp_allocz_array(typeof(*dc->watches), nwatch); + + if (dc->name != NULL && dc->tables != NULL && dc->watches != NULL) { + for (i = 0; i < ntable; i++) { + st = tables + i; + dt = dc->tables + i; + + dt->table = mrp_strdup(st->table); + dt->mql_columns = mrp_strdup(st->mql_columns); + dt->mql_index = mrp_strdup(st->mql_index ? st->mql_index:""); + + if (!dt->table || !dt->mql_columns || !dt->mql_index) + break; + + dc->ntable++; + } + + for (i = 0; i < nwatch; i++) { + sw = watches + i; + dw = dc->watches + i; + + dw->table = mrp_strdup(sw->table); + dw->mql_columns = mrp_strdup(sw->mql_columns); + dw->mql_where = mrp_strdup(sw->mql_where ? sw->mql_where:""); + dw->max_rows = sw->max_rows; + + if (!dw->table || !dw->mql_columns || !dw->mql_where) + break; + + dc->nwatch++; + } + + dc->connect_cb = connect_cb; + dc->watch_cb = watch_cb; + dc->user_data = user_data; + dc->seqno = 1; + + return dc; + } + + mrp_domctl_destroy(dc); + } + + return NULL; +} + + +static void destroy_domctl(mrp_domctl_t *dc) +{ + int i; + + mrp_free(dc->name); + + for (i = 0; i < dc->ntable; i++) { + mrp_free((char *)dc->tables[i].table); + mrp_free((char *)dc->tables[i].mql_columns); + mrp_free((char *)dc->tables[i].mql_index); + } + mrp_free(dc->tables); + + for (i = 0; i < dc->nwatch; i++) { + mrp_free((char *)dc->watches[i].table); + mrp_free((char *)dc->watches[i].mql_columns); + mrp_free((char *)dc->watches[i].mql_where); + } + mrp_free(dc->watches); + + mrp_free(dc); +} + + +static inline void check_destroyed(mrp_domctl_t *dc) +{ + if (dc->destroyed && dc->busy <= 0) { + destroy_domctl(dc); + } +} + + +void mrp_domctl_destroy(mrp_domctl_t *dc) +{ + if (dc != NULL) { + mrp_domctl_disconnect(dc); + + if (dc->busy <= 0) + destroy_domctl(dc); + else + dc->destroyed = TRUE; + } +} + + +static void notify_disconnect(mrp_domctl_t *dc, uint32_t errcode, + const char *errmsg) +{ + DOMCTL_MARK_BUSY(dc, { + dc->connected = FALSE; + dc->connect_cb(dc, FALSE, errcode, errmsg, dc->user_data); + }); +} + + +static void notify_connect(mrp_domctl_t *dc) +{ + DOMCTL_MARK_BUSY(dc, { + dc->connected = TRUE; + dc->connect_cb(dc, TRUE, 0, NULL, dc->user_data); + }); +} + + +static int domctl_register(mrp_domctl_t *dc) +{ + mrp_msg_t *msg; + int success; + + msg = create_register_message(dc); + + if (msg != NULL) { + success = mrp_transport_send(dc->t, msg); + mrp_msg_unref(msg); + } + else + success = FALSE; + + return success; +} + + +int mrp_domctl_connect(mrp_domctl_t *dc, const char *address) +{ + static mrp_transport_evt_t evt = { + .closed = closed_cb, + .recvmsg = recv_cb, + .recvmsgfrom = recvfrom_cb, + }; + + mrp_sockaddr_t addr; + socklen_t addrlen; + const char *type; + + if (dc == NULL) + return FALSE; + + addrlen = mrp_transport_resolve(NULL, address, &addr, sizeof(addr), &type); + + if (addrlen > 0) { + dc->t = mrp_transport_create(dc->ml, type, &evt, dc, 0); + + if (dc->t != NULL) { + if (mrp_transport_connect(dc->t, &addr, addrlen)) + if (domctl_register(dc)) + return TRUE; + + mrp_transport_destroy(dc->t); + dc->t = NULL; + } + } + + return FALSE; +} + + +void mrp_domctl_disconnect(mrp_domctl_t *dc) +{ + if (dc->t != NULL) { + mrp_transport_destroy(dc->t); + dc->t = NULL; + dc->connected = FALSE; + } +} + + +int mrp_domctl_set_data(mrp_domctl_t *dc, mrp_domctl_data_t *tables, int ntable, + mrp_domctl_status_cb_t cb, void *user_data) +{ + mrp_msg_t *msg; + uint32_t seq = dc->seqno++; + int success, i; + + if (!dc->connected) + return FALSE; + + for (i = 0; i < ntable; i++) { + if (tables[i].id < 0 || tables[i].id >= dc->ntable) + return FALSE; + } + + msg = create_set_message(seq, tables, ntable); + + if (msg != NULL) { + /* + mrp_log_info("set data message message:"); + mrp_msg_dump(msg, stdout); + */ + + success = mrp_transport_send(dc->t, msg); + mrp_msg_unref(msg); + + if (success) + queue_pending(dc, seq, cb, user_data); + + return success; + } + else + return FALSE; +} + + +static void process_ack(mrp_domctl_t *dc, uint32_t seq) +{ + if (seq != 0) + notify_pending(dc, seq, 0, NULL); + else + notify_connect(dc); +} + + +static void process_nak(mrp_domctl_t *dc, uint32_t seq, int32_t err, + const char *msg) +{ + if (seq != 0) + notify_pending(dc, seq, err, msg); + else + notify_disconnect(dc, err, msg); +} + + +static void process_notify(mrp_domctl_t *dc, mrp_msg_t *msg, uint32_t seq) +{ + mrp_domctl_data_t *data, *d; + mrp_domctl_value_t *values, *v; + void *it; + uint16_t ntable, ntotal, nrow, ncol; + uint16_t tblid; + int t, r, c; + uint16_t type; + mrp_msg_value_t value; + + if (!mrp_msg_get(msg, + MRP_PEPMSG_UINT16(NCHANGE, &ntable), + MRP_PEPMSG_UINT16(NTOTAL , &ntotal), + MRP_MSG_END)) + return; + + data = alloca(sizeof(*data) * ntable); + values = alloca(sizeof(*values) * ntotal); + + it = NULL; + d = data; + v = values; + + for (t = 0; t < ntable; t++) { + if (!mrp_msg_iterate_get(msg, &it, + MRP_PEPMSG_UINT16(TBLID, &tblid), + MRP_PEPMSG_UINT16(NROW , &nrow ), + MRP_PEPMSG_UINT16(NCOL , &ncol ), + MRP_MSG_END)) + return; + + if (tblid >= dc->nwatch) + return; + + d->id = tblid; + d->ncolumn = ncol; + d->nrow = nrow; + d->rows = alloca(sizeof(*d->rows) * nrow); + + for (r = 0; r < nrow; r++) { + d->rows[r] = v; + + for (c = 0; c < ncol; c++) { + if (!mrp_msg_iterate_get(msg, &it, + MRP_PEPMSG_ANY(DATA, &type, &value), + MRP_MSG_END)) + return; + + switch (type) { + case MRP_MSG_FIELD_STRING: + v->type = MRP_DOMCTL_STRING; + v->str = value.str; + break; + case MRP_MSG_FIELD_SINT32: + v->type = MRP_DOMCTL_INTEGER; + v->s32 = value.s32; + break; + case MRP_MSG_FIELD_UINT32: + v->type = MRP_DOMCTL_UNSIGNED; + v->u32 = value.u32; + break; + case MRP_MSG_FIELD_DOUBLE: + v->type = MRP_DOMCTL_DOUBLE; + v->dbl = value.dbl; + break; + default: + return; + } + + v++; + } + } + + d++; + } + + dc->watch_cb(dc, data, ntable, dc->user_data); +} + + +static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data) +{ + mrp_domctl_t *dc = (mrp_domctl_t *)user_data; + uint16_t type, nchange, ntotal; + uint32_t seq; + int ntable, ncolumn; + int32_t error; + const char *errmsg; + + MRP_UNUSED(t); + + /* + mrp_log_info("Received message:"); + mrp_msg_dump(msg, stdout); + */ + + if (!mrp_msg_get(msg, + MRP_PEPMSG_UINT16(MSGTYPE, &type), + MRP_PEPMSG_UINT32(MSGSEQ , &seq ), + MRP_MSG_END)) { + mrp_domctl_disconnect(dc); + notify_disconnect(dc, EINVAL, "malformed message from client"); + return; + } + + switch (type) { + case MRP_PEPMSG_ACK: + process_ack(dc, seq); + break; + + case MRP_PEPMSG_NAK: + error = EINVAL; + errmsg = "request failed, unknown error"; + + mrp_msg_get(msg, + MRP_PEPMSG_SINT32(ERRCODE, &error), + MRP_PEPMSG_STRING(ERRMSG , &errmsg), + MRP_MSG_END); + + process_nak(dc, seq, error, errmsg); + break; + + case MRP_PEPMSG_NOTIFY: + if (mrp_msg_get(msg, + MRP_PEPMSG_UINT16(NCHANGE, &nchange), + MRP_PEPMSG_UINT16(NTOTAL , &ntotal), + MRP_MSG_END)) { + ntable = nchange; + ncolumn = ntotal; + + process_notify(dc, msg, seq); + } + break; + + default: + break; + } + +} + + +static void recvfrom_cb(mrp_transport_t *t, mrp_msg_t *msg, + mrp_sockaddr_t *addr, socklen_t addrlen, + void *user_data) +{ + MRP_UNUSED(addr); + MRP_UNUSED(addrlen); + + /* XXX TODO: + * This should neither be called nor be necessary to specify. + * However, currently the transport layer mandates having to + * give both recv and recvfrom event callbacks if no connection + * event callback is given. However this is not correct because + * on a client side one wants to be able to create a connection- + * oriented transport without both connection and recvfrom event + * callbacks. This needs to be fixed in transport by moving the + * appropriate callback checks lower in the stack to the actual + * transport backends. + */ + + mrp_log_error("Whoa... recvfrom called for a connected transport."); + exit(1); +} + + +static void closed_cb(mrp_transport_t *t, int error, void *user_data) +{ + mrp_domctl_t *dc = (mrp_domctl_t *)user_data; + + MRP_UNUSED(t); + MRP_UNUSED(dc); + + if (error) + notify_disconnect(dc, error, strerror(error)); + else + notify_disconnect(dc, ECONNRESET, "server has closed the connection"); +} + + +static int queue_pending(mrp_domctl_t *dc, uint32_t seq, + mrp_domctl_status_cb_t cb, void *user_data) +{ + pending_request_t *pending; + + pending = mrp_allocz(sizeof(*pending)); + + if (pending != NULL) { + mrp_list_init(&pending->hook); + + pending->seqno = seq; + pending->cb = cb; + pending->user_data = user_data; + + mrp_list_append(&dc->pending, &pending->hook); + + return TRUE; + } + else + return FALSE; +} + + +static int notify_pending(mrp_domctl_t *dc, uint32_t seq, int error, + const char *msg) +{ + mrp_list_hook_t *p, *n; + pending_request_t *pending; + + mrp_list_foreach(&dc->pending, p, n) { + pending = mrp_list_entry(p, typeof(*pending), hook); + + if (pending->seqno == seq) { + DOMCTL_MARK_BUSY(dc, { + pending->cb(dc, error, msg, pending->user_data); + mrp_list_delete(&pending->hook); + mrp_free(pending); + }); + + return TRUE; + } + } + + return FALSE; +} + + +static void purge_pending(mrp_domctl_t *dc) +{ + mrp_list_hook_t *p, *n; + pending_request_t *pending; + + mrp_list_foreach(&dc->pending, p, n) { + pending = mrp_list_entry(p, typeof(*pending), hook); + + mrp_list_delete(&pending->hook); + mrp_free(pending); + } +} diff --git a/src/plugins/domain-control/client.h b/src/plugins/domain-control/client.h new file mode 100644 index 0000000..91bbd27 --- /dev/null +++ b/src/plugins/domain-control/client.h @@ -0,0 +1,117 @@ +#ifndef __MURPHY_DOMAIN_CONTROL_CLIENT_H__ +#define __MURPHY_DOMAIN_CONTROL_CLIENT_H__ + +#include +#include + +#define MRP_DEFAULT_DOMCTL_ADDRESS "unxs:@murphy-domctrl" + + +/* + * a table owned by a domain controller + */ + +typedef struct { + const char *table; /* table name */ + const char *mql_columns; /* column definition scriptlet */ + const char *mql_index; /* index column list */ +} mrp_domctl_table_t; + +#define MRP_DOMCTL_TABLE(_table, _columns, _index) \ + { .table = _table, .mql_columns = _columns, .mql_index = _index } + + +/* + * a table tracked by a domain controller + */ + +typedef struct { + const char *table; /* table name */ + const char *mql_columns; /* column list for select */ + const char *mql_where; /* where clause for select */ + int max_rows; /* max number of rows to select */ +} mrp_domctl_watch_t; + +#define MRP_DOMCTL_WATCH(_table, _columns, _where, _max_rows) { \ + .table = _table , \ + .mql_columns = _columns ? _columns : "", \ + .mql_where = _where ? _where : "", \ + .max_rows = _max_rows , \ + } + + +/* + * table column types and values + */ + +typedef enum { + MRP_DOMCTL_STRING = mqi_varchar, + MRP_DOMCTL_INTEGER = mqi_integer, + MRP_DOMCTL_UNSIGNED = mqi_unsignd, + MRP_DOMCTL_DOUBLE = mqi_floating +} mrp_domctl_type_t; + +typedef struct { + mrp_domctl_type_t type; /* data type */ + union { + const char *str; /* MRP_DOMCTL_STRING */ + uint32_t u32; /* MRP_DOMCTL_UNSIGNED */ + int32_t s32; /* MRP_DOMCTL_INTEGER */ + double dbl; /* MRP_DOMCTL_DOUBLE */ + }; +} mrp_domctl_value_t; + + +/* + * table data + */ + +typedef struct { + int id; /* table id */ + mqi_column_def_t *coldefs; /* column definitions */ + int ncolumn; /* columns per row */ + mrp_domctl_value_t **rows; /* row data */ + int nrow; /* number of rows */ +} mrp_domctl_data_t; + + + +/** Opaque policy domain controller type. */ +typedef struct mrp_domctl_s mrp_domctl_t; + +/** Callback type for connection state notifications. */ +typedef void (*mrp_domctl_connect_cb_t)(mrp_domctl_t *dc, int connection, + int errcode, const char *errmsg, + void *user_data); + +/** Callback type for request status notifications. */ +typedef void (*mrp_domctl_status_cb_t)(mrp_domctl_t *dc, int errcode, + const char *errmsg, void *user_data); + +/** Callback type for data change notifications. */ +typedef void (*mrp_domctl_watch_cb_t)(mrp_domctl_t *dc, + mrp_domctl_data_t *tables, int ntable, + void *user_data); + +/** Create a new policy domain controller. */ +mrp_domctl_t *mrp_domctl_create(const char *name, mrp_mainloop_t *ml, + mrp_domctl_table_t *tables, int ntable, + mrp_domctl_watch_t *watches, int nwatch, + mrp_domctl_connect_cb_t connect_cb, + mrp_domctl_watch_cb_t watch_cb, + void *user_data); + +/** Destroy the given policy domain controller. */ +void mrp_domctl_destroy(mrp_domctl_t *dc); + +/** Connect and register the given controller to the server. */ +int mrp_domctl_connect(mrp_domctl_t *dc, const char *address); + +/** Close the connection to the server. */ +void mrp_domctl_disconnect(mrp_domctl_t *dc); + +/** Set the content of the given tables to the provided data. */ +int mrp_domctl_set_data(mrp_domctl_t *dc, mrp_domctl_data_t *tables, int ntable, + mrp_domctl_status_cb_t status_cb, void *user_data); + +#endif /* __MURPHY_DOMAIN_CONTROL_CLIENT_H__ */ diff --git a/src/plugins/domain-control/domain-control-types.h b/src/plugins/domain-control/domain-control-types.h new file mode 100644 index 0000000..c5be022 --- /dev/null +++ b/src/plugins/domain-control/domain-control-types.h @@ -0,0 +1,115 @@ +#ifndef __MURPHY_DOMAIN_CONTROL_TYPES_H__ +#define __MURPHY_DOMAIN_CONTROL_TYPES_H__ + +#include +#include +#include +#include +#include + +#include "client.h" + +typedef struct pep_proxy_s pep_proxy_t; +typedef struct pep_table_s pep_table_t; +typedef struct pep_watch_s pep_watch_t; +typedef struct pdp_s pdp_t; + + +/* + * a domain controller (on the client side) + */ + +struct mrp_domctl_s { + char *name; /* enforcment point name */ + mrp_mainloop_t *ml; /* main loop */ + mrp_transport_t *t; /* transport towards murphy */ + int connected; /* transport is up */ + mrp_domctl_table_t *tables; /* owned tables */ + int ntable; /* number of owned tables */ + mrp_domctl_watch_t *watches; /* watched tables */ + int nwatch; /* number of watched tables */ + mrp_domctl_connect_cb_t connect_cb; /* connection state change callback */ + mrp_domctl_watch_cb_t watch_cb; /* watched table change callback */ + void *user_data; /* opqaue user data for callbacks */ + int busy; /* non-zero if a callback is active */ + int destroyed:1;/* non-zero if destroy pending */ + uint32_t seqno; /* request sequence number */ + mrp_list_hook_t pending; /* queue of outstanding requests */ +}; + + +/* + * a table associated with or tracked by an enforcement point + */ + +struct pep_table_s { + char *name; /* table name */ + char *mql_columns; /* column definition clause */ + char *mql_index; /* index column list */ + mrp_list_hook_t hook; /* to list of tables */ + mqi_handle_t h; /* table handle */ + mqi_column_def_t *columns; /* column definitions */ + mqi_column_desc_t *coldesc; /* column descriptors */ + int ncolumn; /* number of columns */ + int idx_col; /* column index of index column */ + mrp_list_hook_t watches; /* watches for this table */ + int notify_all : 1; /* notify all watches */ +}; + + +/* + * a table watch + */ + +struct pep_watch_s { + pep_table_t *table; /* table being watched */ + char *mql_columns; /* column list to select */ + char *mql_where; /* where clause for select */ + int max_rows; /* max number of rows to select */ + pep_proxy_t *proxy; /* enforcement point */ + int id; /* table id within proxy */ + uint32_t stamp; /* last notified update stamp */ + mrp_list_hook_t tbl_hook; /* hook to table watch list */ + mrp_list_hook_t pep_hook; /* hook to proxy watch list */ +}; + + +/* + * a policy enforcement point (on the server side) + */ + +struct pep_proxy_s { + char *name; /* enforcement point name */ + pdp_t *pdp; /* domain controller context */ + mrp_transport_t *t; /* associated transport */ + mrp_list_hook_t hook; /* to list of all enforcement points */ + pep_table_t *tables; /* tables owned by this */ + int ntable; /* number of tables */ + mrp_list_hook_t watches; /* tables watched by this */ + int notify_update; /* whether needs notification */ + mrp_msg_t *notify_msg; /* notification being built */ + int notify_ntable; /* number of changed tables */ + int notify_ncolumn; /* total columns in notification */ + int notify_fail : 1; /* notification failure */ + int notify_all : 1; /* notify all watches */ +}; + + +/* + * policy domain controller context + */ + +struct pdp_s { + mrp_context_t *ctx; /* murphy context */ + const char *address; /* external transport address */ + mrp_transport_t *ext; /* external transport */ + mrp_list_hook_t proxies; /* list of enforcement points */ + mrp_list_hook_t tables; /* list of tables we track */ + mrp_htbl_t *watched; /* tracked tables by name */ + mrp_deferred_t *notify; /* deferred notification */ + int notify_scheduled; /* is notification scheduled? */ +}; + + + +#endif /* __MURPHY_DOMAIN_CONTROL_TYPES_H__ */ diff --git a/src/plugins/domain-control/domain-control.c b/src/plugins/domain-control/domain-control.c new file mode 100644 index 0000000..5459105 --- /dev/null +++ b/src/plugins/domain-control/domain-control.c @@ -0,0 +1,386 @@ +#include + +#include +#include + +#include "proxy.h" +#include "table.h" +#include "message.h" +#include "notify.h" +#include "domain-control.h" + +static int create_transports(pdp_t *pdp); +static void destroy_transports(pdp_t *pdp); + +pdp_t *create_domain_control(mrp_context_t *ctx, const char *address) +{ + pdp_t *pdp; + + pdp = mrp_allocz(sizeof(*pdp)); + + if (pdp != NULL) { + pdp->ctx = ctx; + pdp->address = address; + + if (init_proxies(pdp) && init_tables(pdp) && create_transports(pdp)) + return pdp; + else + destroy_domain_control(pdp); + } + + return NULL; +} + + +void destroy_domain_control(pdp_t *pdp) +{ + if (pdp != NULL) { + destroy_proxies(pdp); + destroy_tables(pdp); + destroy_transports(pdp); + + mrp_free(pdp); + } +} + + +static void notify_cb(mrp_mainloop_t *ml, mrp_deferred_t *d, void *user_data) +{ + pdp_t *pdp = (pdp_t *)user_data; + + MRP_UNUSED(ml); + + mrp_disable_deferred(d); + pdp->notify_scheduled = FALSE; + notify_table_changes(pdp); +} + + +void schedule_notification(pdp_t *pdp) +{ + + if (pdp->notify == NULL) + pdp->notify = mrp_add_deferred(pdp->ctx->ml, notify_cb, pdp); + + if (!pdp->notify_scheduled) { + mrp_debug("scheduling client notification"); + mrp_enable_deferred(pdp->notify); + } +} + + +static void send_ack_reply(mrp_transport_t *t, uint32_t seq) +{ + mrp_msg_t *msg; + + msg = create_ack_message(seq); + + if (msg != NULL) { + mrp_transport_send(t, msg); + mrp_msg_unref(msg); + } +} + + +static void send_nak_reply(mrp_transport_t *t, uint32_t seq, int error, + const char *errmsg) +{ + mrp_msg_t *msg; + + msg = create_nak_message(seq, error, errmsg); + + if (msg != NULL) { + mrp_transport_send(t, msg); + mrp_msg_unref(msg); + } +} + + +static int process_register_request(pep_proxy_t *proxy, mrp_msg_t *req, + uint32_t seq) +{ + mrp_transport_t *t = proxy->t; + char *name; + mrp_domctl_table_t *tables; + mrp_domctl_watch_t *watches; + uint16_t utable, uwatch; + int ntable, nwatch; + int error; + const char *errmsg; + + if (mrp_msg_get(req, + MRP_PEPMSG_STRING(NAME , &name ), + MRP_PEPMSG_UINT16(NTABLE , &utable), + MRP_PEPMSG_UINT16(NWATCH , &uwatch), + MRP_MSG_END)) { + ntable = utable; + nwatch = uwatch; + tables = alloca(ntable * sizeof(*tables)); + watches = alloca(nwatch * sizeof(*watches)); + + if (decode_register_message(req, tables, ntable, watches, nwatch)) { + if (register_proxy(proxy, name, tables, ntable, watches, nwatch, + &error, &errmsg)) { + send_ack_reply(t, seq); + proxy->notify_all = TRUE; + schedule_notification(proxy->pdp); + + return TRUE; + } + } + else + goto malformed; + } + else { + malformed: + error = EINVAL; + errmsg = "malformed register message"; + } + + send_nak_reply(t, seq, error, errmsg); + + return FALSE; +} + + +static void process_unregister_request(pep_proxy_t *proxy, uint32_t seq) +{ + send_ack_reply(proxy->t, seq); + unregister_proxy(proxy); +} + + +static void process_set_request(pep_proxy_t *proxy, mrp_msg_t *msg, + uint32_t seq) +{ + mrp_domctl_data_t *data, *d; + mrp_domctl_value_t *values, *v; + void *it; + uint16_t ntable, ntotal, nrow, ncol; + uint16_t tblid; + int t, r, c; + uint16_t type; + mrp_msg_value_t value; + int error; + const char *errmsg; + + if (!mrp_msg_get(msg, + MRP_PEPMSG_UINT16(NCHANGE, &ntable), + MRP_PEPMSG_UINT16(NTOTAL , &ntotal), + MRP_MSG_END)) + return; + + data = alloca(sizeof(*data) * ntable); + values = alloca(sizeof(*values) * ntotal); + + it = NULL; + d = data; + v = values; + + for (t = 0; t < ntable; t++) { + if (!mrp_msg_iterate_get(msg, &it, + MRP_PEPMSG_UINT16(TBLID, &tblid), + MRP_PEPMSG_UINT16(NROW , &nrow ), + MRP_PEPMSG_UINT16(NCOL , &ncol ), + MRP_MSG_END)) + goto reply_nak; + + if (tblid >= proxy->ntable) + goto reply_nak; + + d->id = tblid; + d->ncolumn = ncol; + d->nrow = nrow; + d->rows = alloca(sizeof(*d->rows) * nrow); + + for (r = 0; r < nrow; r++) { + d->rows[r] = v; + + for (c = 0; c < ncol; c++) { + if (!mrp_msg_iterate_get(msg, &it, + MRP_PEPMSG_ANY(DATA, &type, &value), + MRP_MSG_END)) + goto reply_nak; + + switch (type) { + case MRP_MSG_FIELD_STRING: + v->type = MRP_DOMCTL_STRING; + v->str = value.str; + break; + case MRP_MSG_FIELD_SINT32: + v->type = MRP_DOMCTL_INTEGER; + v->s32 = value.s32; + break; + case MRP_MSG_FIELD_UINT32: + v->type = MRP_DOMCTL_UNSIGNED; + v->u32 = value.u32; + break; + case MRP_MSG_FIELD_DOUBLE: + v->type = MRP_DOMCTL_DOUBLE; + v->dbl = value.dbl; + break; + default: + goto reply_nak; + } + + v++; + } + } + + d++; + } + + if (set_proxy_tables(proxy, data, ntable, &error, &errmsg)) { + send_ack_reply(proxy->t, seq); + } + else { + reply_nak: + send_nak_reply(proxy->t, seq, error, errmsg); + } +} + + +static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data) +{ + pep_proxy_t *proxy = (pep_proxy_t *)user_data; + char *name = proxy && proxy->name ? proxy->name : ""; + uint16_t type; + uint32_t seq; + + /* + mrp_log_info("Message from client %p:", proxy); + mrp_msg_dump(msg, stdout); + */ + + if (!mrp_msg_get(msg, + MRP_PEPMSG_UINT16(MSGTYPE, &type), + MRP_PEPMSG_UINT32(MSGSEQ , &seq ), + MRP_MSG_END)) { + mrp_log_error("Malformed message from client %s.", name); + send_nak_reply(t, 0, EINVAL, "malformed message"); + } + else { + switch (type) { + case MRP_PEPMSG_REGISTER: + if (!process_register_request(proxy, msg, seq)) + destroy_proxy(proxy); + break; + + case MRP_PEPMSG_UNREGISTER: + process_unregister_request(proxy, seq); + break; + + case MRP_PEPMSG_SET: + process_set_request(proxy, msg, seq); + break; + + default: + break; + } + } +} + + +static void connect_cb(mrp_transport_t *ext, void *user_data) +{ + pdp_t *pdp = (pdp_t *)user_data; + pep_proxy_t *proxy; + int flags; + + proxy = create_proxy(pdp); + + if (proxy != NULL) { + flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK; + proxy->t = mrp_transport_accept(ext, proxy, flags); + + if (proxy->t != NULL) + mrp_log_info("Accepted new client connection."); + else { + mrp_log_error("Failed to accept new client connection."); + destroy_proxy(proxy); + } + } +} + + +static void closed_cb(mrp_transport_t *t, int error, void *user_data) +{ + pep_proxy_t *proxy = (pep_proxy_t *)user_data; + char *name = proxy && proxy->name ? proxy->name : ""; + + MRP_UNUSED(t); + + if (error) + mrp_log_error("Transport to client %s closed (%d: %s).", + name, error, strerror(error)); + else + mrp_log_info("Transport to client %s closed.", name); + + mrp_log_info("Destroying client %s.", name); + destroy_proxy(proxy); +} + + +static int create_ext_transport(pdp_t *pdp) +{ + static mrp_transport_evt_t evt = { + .closed = closed_cb, + .recvmsg = recv_cb, + .recvmsgfrom = NULL, + .connection = connect_cb, + }; + + mrp_transport_t *t; + mrp_sockaddr_t addr; + socklen_t addrlen; + int flags; + const char *type; + + t = NULL; + addrlen = mrp_transport_resolve(NULL, pdp->address, + &addr, sizeof(addr), &type); + + if (addrlen > 0) { + flags = MRP_TRANSPORT_REUSEADDR; + t = mrp_transport_create(pdp->ctx->ml, type, &evt, pdp, flags); + + if (t != NULL) { + if (mrp_transport_bind(t, &addr, addrlen) && + mrp_transport_listen(t, 4)) { + mrp_log_info("Listening on transport %s...", pdp->address); + pdp->ext = t; + + return TRUE; + } + else + mrp_log_error("Failed to bind transport to %s.", pdp->address); + } + else + mrp_log_error("Failed to create transport for %s.", pdp->address); + } + else + mrp_log_error("Invalid transport address %s.", pdp->address); + + return FALSE; +} + + +static void destroy_ext_transport(pdp_t *pdp) +{ + if (pdp != NULL) { + mrp_transport_destroy(pdp->ext); + pdp->ext = NULL; + } +} + + +static int create_transports(pdp_t *pdp) +{ + return create_ext_transport(pdp); +} + + +static void destroy_transports(pdp_t *pdp) +{ + destroy_ext_transport(pdp); +} diff --git a/src/plugins/domain-control/domain-control.h b/src/plugins/domain-control/domain-control.h new file mode 100644 index 0000000..27641ce --- /dev/null +++ b/src/plugins/domain-control/domain-control.h @@ -0,0 +1,11 @@ +#ifndef __MURPHY_DOMAIN_CONTROL_H__ +#define __MURPHY_DOMAIN_CONTROL_H__ + +#include "domain-control-types.h" + +pdp_t *create_domain_control(mrp_context_t *ctx, const char *address); +void destroy_domain_control(pdp_t *pdp); + +void schedule_notification(pdp_t *pdp); + +#endif /* __MURPHY_DOMAIN_CONTROL_H__ */ diff --git a/src/plugins/domain-control/message.c b/src/plugins/domain-control/message.c new file mode 100644 index 0000000..d26e0f0 --- /dev/null +++ b/src/plugins/domain-control/message.c @@ -0,0 +1,235 @@ +#include "message.h" + +static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col, + int ncolumn, mrp_domctl_value_t *data); + +mrp_msg_t *create_register_message(mrp_domctl_t *dc) +{ + mrp_msg_t *msg; + mrp_domctl_table_t *t; + mrp_domctl_watch_t *w; + int i; + + msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_REGISTER), + MRP_PEPMSG_UINT32(MSGSEQ , 0), + MRP_PEPMSG_STRING(NAME , dc->name), + MRP_PEPMSG_UINT16(NTABLE , dc->ntable), + MRP_PEPMSG_UINT16(NWATCH , dc->nwatch), + MRP_MSG_END); + + for (i = 0, t = dc->tables; i < dc->ntable; i++, t++) { + mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, t->table)); + mrp_msg_append(msg, MRP_PEPMSG_STRING(COLUMNS, t->mql_columns)); + mrp_msg_append(msg, MRP_PEPMSG_STRING(INDEX , t->mql_index)); + } + + for (i = 0, w = dc->watches; i < dc->nwatch; i++, w++) { + mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, w->table)); + mrp_msg_append(msg, MRP_PEPMSG_STRING(COLUMNS, w->mql_columns)); + mrp_msg_append(msg, MRP_PEPMSG_STRING(WHERE , w->mql_where)); + mrp_msg_append(msg, MRP_PEPMSG_UINT16(MAXROWS, w->max_rows)); + } + + + return msg; +} + + +int decode_register_message(mrp_msg_t *msg, + mrp_domctl_table_t *tables, int ntable, + mrp_domctl_watch_t *watches, int nwatch) +{ + mrp_domctl_table_t *t; + mrp_domctl_watch_t *w; + void *it; + char *table, *columns, *index, *where; + uint16_t ntbl, nwch, max_rows; + int i; + + it = NULL; + + if (!mrp_msg_iterate_get(msg, &it, + MRP_PEPMSG_UINT16(NTABLE , &ntbl), + MRP_PEPMSG_UINT16(NWATCH , &nwch), + MRP_MSG_END)) + return FALSE; + + if (ntbl > ntable || nwch > nwatch) + return FALSE; + + for (i = 0, t = tables; i < ntable; i++, t++) { + if (mrp_msg_iterate_get(msg, &it, + MRP_PEPMSG_STRING(TBLNAME, &table), + MRP_PEPMSG_STRING(COLUMNS, &columns), + MRP_PEPMSG_STRING(INDEX , &index), + MRP_MSG_END)) { + t->table = table; + t->mql_columns = columns; + t->mql_index = index; + } + else + return FALSE; + } + + for (i = 0, w = watches; i < nwatch; i++, w++) { + if (mrp_msg_iterate_get(msg, &it, + MRP_PEPMSG_STRING(TBLNAME, &table), + MRP_PEPMSG_STRING(COLUMNS, &columns), + MRP_PEPMSG_STRING(WHERE , &where), + MRP_PEPMSG_UINT16(MAXROWS, &max_rows), + MRP_MSG_END)) { + w->table = table; + w->mql_columns = columns; + w->mql_where = where; + w->max_rows = max_rows; + } + else + return FALSE; + } + + return TRUE; +} + + +mrp_msg_t *create_ack_message(uint32_t seq) +{ + return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_ACK), + MRP_PEPMSG_UINT32(MSGSEQ , seq), + MRP_MSG_END); +} + + +mrp_msg_t *create_nak_message(uint32_t seq, int error, const char *errmsg) +{ + return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NAK), + MRP_PEPMSG_UINT32(MSGSEQ , seq), + MRP_PEPMSG_SINT32(ERRCODE, error), + MRP_PEPMSG_STRING(ERRMSG , errmsg), + MRP_MSG_END); +} + + +mrp_msg_t *create_notify_message(void) +{ + return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NOTIFY), + MRP_PEPMSG_UINT32(MSGSEQ , 0), + MRP_PEPMSG_UINT16(NCHANGE, 0), + MRP_PEPMSG_UINT16(NTOTAL , 0), + MRP_MSG_END); +} + + +int update_notify_message(mrp_msg_t *msg, int id, mqi_column_def_t *columns, + int ncolumn, mrp_domctl_value_t *data, int nrow) +{ + mrp_domctl_value_t *v; + uint16_t tid, nr; + int i; + + nr = nrow; + tid = id; + + if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) || + !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nr ))) + return FALSE; + + for (i = 0, v = data; i < nrow; i++, v += ncolumn) { + if (!append_one_row(msg, MRP_PEPTAG_DATA, columns, ncolumn, v)) + return FALSE; + } + + return TRUE; +} + + +mrp_msg_t *create_set_message(uint32_t seq, mrp_domctl_data_t *tables, + int ntable) +{ + mrp_msg_t *msg; + mrp_domctl_value_t *rows, *col; + uint16_t utable, utotal, tid, ncol, nrow; + int i, r, c; + + utable = ntable; + utotal = 0; + + msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_SET), + MRP_PEPMSG_UINT32(MSGSEQ , seq), + MRP_PEPMSG_UINT16(NCHANGE, utable), + MRP_PEPMSG_UINT16(NTOTAL , 0), + MRP_MSG_END); + + if (msg != NULL) { + for (i = 0; i < ntable; i++) { + tid = tables[i].id; + ncol = tables[i].ncolumn; + nrow = tables[i].nrow; + + if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) || + !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nrow)) || + !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOL , ncol))) + goto fail; + + for (r = 0; r < nrow; r++) { + rows = tables[i].rows[r]; + + for (c = 0; c < ncol; c++) { + col = rows + c; +#define HANDLE_TYPE(pt, t, m) \ + case MRP_DOMCTL_##pt: \ + if (!mrp_msg_append(msg, MRP_PEPMSG_##t(DATA,col->m))) \ + goto fail; \ + break; + + switch (col->type) { + HANDLE_TYPE(STRING , STRING, str); + HANDLE_TYPE(INTEGER , SINT32, s32); + HANDLE_TYPE(UNSIGNED, UINT32, u32); + HANDLE_TYPE(DOUBLE , DOUBLE, dbl); + default: + goto fail; + } +#undef HANDLE_TYPE + } + } + utotal += nrow * ncol; + } + + mrp_msg_set(msg, MRP_PEPMSG_UINT16(NTOTAL, utotal)); + + return msg; + } + + fail: + mrp_msg_unref(msg); + return NULL; +} + + +static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col, + int ncolumn, mrp_domctl_value_t *data) +{ +#define HANDLE_TYPE(dbtype, type, member) \ + case mqi_##dbtype: \ + if (!mrp_msg_append(msg, MRP_MSG_TAG_##type(tag, data->member))) \ + return FALSE; \ + break + + int i; + + for (i = 0; i < ncolumn; i++, data++, col++) { + switch (col->type) { + HANDLE_TYPE(integer , SINT32, s32); + HANDLE_TYPE(unsignd , UINT32, u32); + HANDLE_TYPE(floating, DOUBLE, dbl); + HANDLE_TYPE(string , STRING, str); + case mqi_blob: + default: + return FALSE; + } + } + + return TRUE; + +#undef HANDLE_TYPE +} diff --git a/src/plugins/domain-control/message.h b/src/plugins/domain-control/message.h new file mode 100644 index 0000000..41f37f2 --- /dev/null +++ b/src/plugins/domain-control/message.h @@ -0,0 +1,100 @@ +#ifndef __MURPHY_DOMAIN_CONTROL_MESSAGE_H__ +#define __MURPHY_DOMAIN_CONTROL_MESSAGE_H__ + +#include + +#include "domain-control-types.h" +#include "client.h" + + +#define MRP_PEPMSG_UINT16(tag, value) \ + MRP_MSG_TAG_UINT16(MRP_PEPTAG_##tag, value) + +#define MRP_PEPMSG_SINT16(tag, value) \ + MRP_MSG_TAG_SINT16(MRP_PEPTAG_##tag, value) + +#define MRP_PEPMSG_UINT32(tag, value) \ + MRP_MSG_TAG_UINT32(MRP_PEPTAG_##tag, value) + +#define MRP_PEPMSG_SINT32(tag, value) \ + MRP_MSG_TAG_SINT32(MRP_PEPTAG_##tag, value) + +#define MRP_PEPMSG_DOUBLE(tag, value) \ + MRP_MSG_TAG_DOUBLE(MRP_PEPTAG_##tag, value) + +#define MRP_PEPMSG_STRING(tag, value) \ + MRP_MSG_TAG_STRING(MRP_PEPTAG_##tag, value) + +#define MRP_PEPMSG_ANY(tag, typep, valuep) \ + MRP_MSG_TAG_ANY(MRP_PEPTAG_##tag, typep, valuep) + +/* + * message types + */ + +typedef enum { + MRP_PEPMSG_REGISTER = 0x1, /* client: register me */ + MRP_PEPMSG_UNREGISTER = 0x2, /* client: unregister me */ + MRP_PEPMSG_SET = 0x3, /* client: set table data */ + MRP_PEPMSG_NOTIFY = 0x4, /* server: table changes */ + MRP_PEPMSG_ACK = 0x5, /* server: ok */ + MRP_PEPMSG_NAK = 0x6, /* server: request failed */ +} mrp_pepmsg_type_t; + + +/* + * message-specific tags + */ + +typedef enum { + /* + * fixed common tags + */ + MRP_PEPTAG_MSGTYPE = 0x1, /* message type */ + MRP_PEPTAG_MSGSEQ = 0x2, /* sequence number */ + + /* + * fixed tags in registration messages + */ + MRP_PEPTAG_NAME = 0x3, /* enforcement point name */ + MRP_PEPTAG_NTABLE = 0x4, /* number of owned tables */ + MRP_PEPTAG_NWATCH = 0x5, /* number of watched tables */ + MRP_PEPTAG_TBLNAME = 0x6, /* table name */ + MRP_PEPTAG_COLUMNS = 0x8, /* column definitions/list */ + MRP_PEPTAG_INDEX = 0x9, /* index definition */ + MRP_PEPTAG_WHERE = 0xa, /* where clause for select */ + MRP_PEPTAG_MAXROWS = 0xb, /* max number of rows to select */ + + /* + * fixed tags in NAKs + */ + MRP_PEPTAG_ERRCODE = 0x3, /* error code */ + MRP_PEPTAG_ERRMSG = 0x4, /* error message */ + + /* + * fixed tags in data notification messages + */ + MRP_PEPTAG_NCHANGE = 0x3, /* number of tables in notification */ + MRP_PEPTAG_NTOTAL = 0x4, /* total columns in notification */ + MRP_PEPTAG_TBLID = 0x5, /* table id */ + MRP_PEPTAG_NROW = 0x6, /* number of table rows */ + MRP_PEPTAG_NCOL = 0x7, /* number of columns in a row */ + MRP_PEPTAG_DATA = 0x8, /* a data column */ +} mrp_pepmsg_tag_t; + + +mrp_msg_t *create_register_message(mrp_domctl_t *dc); +int decode_register_message(mrp_msg_t *msg, + mrp_domctl_table_t *tables, int ntable, + mrp_domctl_watch_t *watches, int nwatch); + +mrp_msg_t *create_ack_message(uint32_t seq); +mrp_msg_t *create_nak_message(uint32_t seq, int error, const char *errmsg); +mrp_msg_t *create_notify_message(void); +int update_notify_message(mrp_msg_t *msg, int id, mqi_column_def_t *columns, + int ncolumn, mrp_domctl_value_t *data, int nrow); + +mrp_msg_t *create_set_message(uint32_t seq, mrp_domctl_data_t *tables, + int ntable); + +#endif /* __MURPHY_DOMAIN_CONTROL_MESSAGE_H__ */ diff --git a/src/plugins/domain-control/notify.c b/src/plugins/domain-control/notify.c new file mode 100644 index 0000000..b9420e6 --- /dev/null +++ b/src/plugins/domain-control/notify.c @@ -0,0 +1,219 @@ +#include +#include + +#include + +#include "domain-control-types.h" +#include "message.h" +#include "table.h" +#include "notify.h" + + +static void prepare_proxy_notification(pep_proxy_t *proxy) +{ + proxy->notify_update = FALSE; + proxy->notify_ntable = 0; + proxy->notify_ncolumn = 0; + proxy->notify_fail = FALSE; +} + + +static void check_watch_notification(pep_watch_t *w) +{ + pep_proxy_t *proxy = w->proxy; + pep_table_t *t = w->table; + int update; + + if (t->notify_all) { + t->h = mqi_get_table_handle(t->name); + update = TRUE; + } + else { + if (t->h != MQI_HANDLE_INVALID) + update = (w->stamp < mqi_get_table_stamp(t->h)); + else + update = FALSE; + } + + proxy->notify_update |= update; +} + + +static int collect_watch_notification(pep_watch_t *w) +{ + pep_proxy_t *proxy = w->proxy; + pep_table_t *t = w->table; + mql_result_t *r = NULL; + uint16_t tid = w->id; + uint16_t nrow, ncol; + mrp_msg_t *msg; + int i, j; + int types[MQI_COLUMN_MAX]; + const char *str; + uint32_t u32; + int32_t s32; + double dbl; + + mrp_debug("updating %s watch for %s", t->name, proxy->name); + + if (proxy->notify_msg == NULL) { + proxy->notify_msg = create_notify_message(); + + if (proxy->notify_msg == NULL) + goto fail; + } + + if (t->h != MQI_HANDLE_INVALID) { + if (!exec_mql(mql_result_rows, &r, "select %s from %s%s%s", + w->mql_columns, t->name, + w->mql_where[0] ? " where " : "", w->mql_where)) { + mrp_debug("select from table %s failed", t->name); + goto fail; + } + } + + if (r != NULL) { + nrow = mql_result_rows_get_row_count(r); + ncol = mql_result_rows_get_row_column_count(r); + } + else + nrow = ncol = 0; + + msg = proxy->notify_msg; + + if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) || + !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nrow)) || + !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOL , ncol))) + goto fail; + + for (i = 0; i < ncol; i++) + types[i] = mql_result_rows_get_row_column_type(r, i); + + for (i = 0; i < nrow; i++) { + for (j = 0; j < ncol; j++) { + switch (types[j]) { + case mqi_string: + str = mql_result_rows_get_string(r, j, i, NULL, 0); + if (!mrp_msg_append(msg, MRP_PEPMSG_STRING(DATA, str))) + goto fail; + break; + case mqi_integer: + s32 = mql_result_rows_get_integer(r, j, i); + if (!mrp_msg_append(msg, MRP_PEPMSG_SINT32(DATA, s32))) + goto fail; + break; + case mqi_unsignd: + u32 = mql_result_rows_get_unsigned(r, j, i); + if (!mrp_msg_append(msg, MRP_PEPMSG_UINT32(DATA, u32))) + goto fail; + break; + + case mqi_floating: + dbl = mql_result_rows_get_floating(r, j, i); + if (!mrp_msg_append(msg, MRP_PEPMSG_DOUBLE(DATA, dbl))) + goto fail; + break; + + default: + goto fail; + } + } + } + + if (r != NULL) + mql_result_free(r); + + proxy->notify_ncolumn += nrow * ncol; + proxy->notify_ntable++; + + return TRUE; + + fail: + if (r != NULL) + mql_result_free(r); + mrp_msg_unref(proxy->notify_msg); + proxy->notify_msg = NULL; + proxy->notify_fail = TRUE; + + return FALSE; +} + + +static int send_proxy_notification(pep_proxy_t *proxy) +{ + uint16_t nchange, ntotal; + + if (proxy->notify_msg == NULL) + return TRUE; + + if (!proxy->notify_fail) { + mrp_debug("notifying client %s", proxy->name); + + nchange = proxy->notify_ntable; + ntotal = proxy->notify_ncolumn; + + mrp_msg_set(proxy->notify_msg, MRP_PEPMSG_UINT16(NCHANGE, nchange)); + mrp_msg_set(proxy->notify_msg, MRP_PEPMSG_UINT16(NTOTAL , ntotal )); + + /* + mrp_log_info("Notification message for client %s:", proxy->name); + mrp_msg_dump(proxy->notify_msg, stdout); + */ + + mrp_transport_send(proxy->t, proxy->notify_msg); + } + else + mrp_log_error("Failed to generate/send notification to %s.", + proxy->name); + + mrp_msg_unref(proxy->notify_msg); + + proxy->notify_msg = NULL; + proxy->notify_ntable = 0; + proxy->notify_ncolumn = 0; + proxy->notify_fail = FALSE; + proxy->notify_all = FALSE; + + return TRUE; +} + + +void notify_table_changes(pdp_t *pdp) +{ + mrp_list_hook_t *p, *n, *wp, *wn; + pep_proxy_t *proxy; + pep_table_t *t; + pep_watch_t *w; + + mrp_debug("notifying clients about table changes"); + + mrp_list_foreach(&pdp->proxies, p, n) { + proxy = mrp_list_entry(p, typeof(*proxy), hook); + prepare_proxy_notification(proxy); + } + + mrp_list_foreach(&pdp->tables, p, n) { + t = mrp_list_entry(p, typeof(*t), hook); + + mrp_list_foreach(&t->watches, wp, wn) { + w = mrp_list_entry(wp, typeof(*w), tbl_hook); + check_watch_notification(w); + } + + t->notify_all = FALSE; + } + + mrp_list_foreach(&pdp->proxies, p, n) { + proxy = mrp_list_entry(p, typeof(*proxy), hook); + + if (proxy->notify_update || proxy->notify_all) { + mrp_list_foreach(&proxy->watches, wp, wn) { + w = mrp_list_entry(wp, typeof(*w), pep_hook); + if (!collect_watch_notification(w)) + break; + } + + send_proxy_notification(proxy); + } + } +} diff --git a/src/plugins/domain-control/notify.h b/src/plugins/domain-control/notify.h new file mode 100644 index 0000000..8493420 --- /dev/null +++ b/src/plugins/domain-control/notify.h @@ -0,0 +1,8 @@ +#ifndef __MURPHY_DOMAIN_CONTROL_NOTIFY_H__ +#define __MURPHY_DOMAIN_CONTROL_NOTIFY_H__ + +#include "domain-control-types.h" + +void notify_table_changes(pdp_t *pdp); + +#endif /* __MURPHY_DOMAIN_CONTROL_NOTIFY_H__ */ diff --git a/src/plugins/domain-control/plugin-domain-control.c b/src/plugins/domain-control/plugin-domain-control.c new file mode 100644 index 0000000..934edc6 --- /dev/null +++ b/src/plugins/domain-control/plugin-domain-control.c @@ -0,0 +1,56 @@ +#include + +#include +#include + +#include "domain-control-types.h" +#include "domain-control.h" +#include "client.h" + + +static int plugin_init(mrp_plugin_t *plugin) +{ + const char *address = MRP_DEFAULT_DOMCTL_ADDRESS; + + plugin->data = create_domain_control(plugin->ctx, address); + + return (plugin->data != NULL); +} + + +static void plugin_exit(mrp_plugin_t *plugin) +{ + pdp_t *pdp = (pdp_t *)plugin->data; + + destroy_domain_control(pdp); +} + + +static void cmd_cb(mrp_console_t *c, void *user_data, int argc, char **argv) +{ + MRP_UNUSED(user_data); + MRP_UNUSED(argc); + MRP_UNUSED(argv); + + mrp_console_printf(c, "domctrl:%s() called...\n", __FUNCTION__); +} + + +#define PLUGIN_DESCRIPTION "Murphy domain control plugin." +#define PLUGIN_VERSION MRP_VERSION_INT(0, 0, 1) +#define PLUGIN_HELP "TODO..." +#define PLUGIN_AUTHORS "Krisztian Litkey " + +MRP_CONSOLE_GROUP(plugin_commands, "domain-control", NULL, NULL, { + MRP_TOKENIZED_CMD("cmd", cmd_cb, TRUE, + "cmd [args]", "a command", "A command..."), +}); + +MURPHY_REGISTER_PLUGIN("domain-control", + PLUGIN_VERSION, PLUGIN_DESCRIPTION, + PLUGIN_AUTHORS, PLUGIN_HELP, MRP_SINGLETON, + plugin_init, plugin_exit, + NULL, 0, /* plugin argument table */ + NULL, 0, /* exported methods */ + NULL, 0, /* imported methods */ + &plugin_commands); diff --git a/src/plugins/domain-control/proxy.c b/src/plugins/domain-control/proxy.c new file mode 100644 index 0000000..705757a --- /dev/null +++ b/src/plugins/domain-control/proxy.c @@ -0,0 +1,128 @@ +#include + +#include +#include +#include + +#include "domain-control-types.h" +#include "table.h" +#include "proxy.h" + + +int init_proxies(pdp_t *pdp) +{ + mrp_list_init(&pdp->proxies); + + return TRUE; +} + + +void destroy_proxies(pdp_t *pdp) +{ + MRP_UNUSED(pdp); + + return; +} + + +pep_proxy_t *create_proxy(pdp_t *pdp) +{ + pep_proxy_t *proxy; + + proxy = mrp_allocz(sizeof(*proxy)); + + if (proxy != NULL) { + mrp_list_init(&proxy->hook); + mrp_list_init(&proxy->watches); + + proxy->pdp = pdp; + + mrp_list_append(&pdp->proxies, &proxy->hook); + } + + return proxy; +} + + +void destroy_proxy(pep_proxy_t *proxy) +{ + int i; + + if (proxy != NULL) { + mrp_list_delete(&proxy->hook); + + for (i = 0; i < proxy->ntable; i++) + destroy_proxy_table(proxy->tables + i); + + destroy_proxy_watches(proxy); + + mrp_free(proxy); + } +} + + +int register_proxy(pep_proxy_t *proxy, char *name, + mrp_domctl_table_t *tables, int ntable, + mrp_domctl_watch_t *watches, int nwatch, + int *error, const char **errmsg) +{ + pep_table_t *t; + mrp_domctl_watch_t *w; + int i; + + proxy->name = mrp_strdup(name); + proxy->tables = mrp_allocz_array(typeof(*proxy->tables) , ntable); + proxy->ntable = ntable; + + if (proxy->name == NULL || (ntable && proxy->tables == NULL)) { + *error = ENOMEM; + *errmsg = "failed to allocate proxy table"; + + return FALSE; + } + + for (i = 0, t = proxy->tables; i < ntable; i++, t++) { + t->h = MQI_HANDLE_INVALID; + t->name = mrp_strdup(tables[i].table); + t->mql_columns = mrp_strdup(tables[i].mql_columns); + t->mql_index = mrp_strdup(tables[i].mql_index); + + if (t->name == NULL || t->mql_columns == NULL || t->mql_index == NULL) { + mrp_log_error("Failed to allocate proxy table %s for %s.", + tables[i].table, name); + *error = ENOMEM; + *errmsg = "failed to allocate proxy table"; + + return FALSE; + } + + if (create_proxy_table(t, error, errmsg)) + mrp_log_info("Client %s created table %s.", proxy->name, + tables[i].table); + else { + mrp_log_error("Client %s failed to create table %s (%d: %s).", + proxy->name, tables[i].table, *error, *errmsg); + return FALSE; + } + } + + for (i = 0, w = watches; i < nwatch; i++, w++) { + if (create_proxy_watch(proxy, i, w->table, w->mql_columns, + w->mql_where, w->max_rows, error, errmsg)) + mrp_log_info("Client %s subscribed for table %s.", proxy->name, + w->table); + else + mrp_log_error("Client %s failed to subscribe for table %s.", + proxy->name, w->table); + } + + return TRUE; +} + + +int unregister_proxy(pep_proxy_t *proxy) +{ + destroy_proxy(proxy); + + return TRUE; +} diff --git a/src/plugins/domain-control/proxy.h b/src/plugins/domain-control/proxy.h new file mode 100644 index 0000000..6308e61 --- /dev/null +++ b/src/plugins/domain-control/proxy.h @@ -0,0 +1,18 @@ +#ifndef __MURPHY_DOMAIN_CONTROL_PROXY_H__ +#define __MURPHY_DOMAIN_CONTROL_PROXY_H__ + +#include "domain-control-types.h" + +int init_proxies(pdp_t *pdp); +void destroy_proxies(pdp_t *pdp); + +pep_proxy_t *create_proxy(pdp_t *pdp); +void destroy_proxy(pep_proxy_t *proxy); + +int register_proxy(pep_proxy_t *proxy, char *name, + mrp_domctl_table_t *tables, int ntable, + mrp_domctl_watch_t *watches, int nwatch, + int *error, const char **errmsg); +int unregister_proxy(pep_proxy_t *proxy); + +#endif /* __MURPHY_DOMAIN_CONTROL_PROXY_H__ */ diff --git a/src/plugins/domain-control/table-common.c b/src/plugins/domain-control/table-common.c new file mode 100644 index 0000000..b0489e6 --- /dev/null +++ b/src/plugins/domain-control/table-common.c @@ -0,0 +1,10 @@ +#include + +#include +#include +#include +#include + +#include + +#include "table.h" diff --git a/src/plugins/domain-control/table.c b/src/plugins/domain-control/table.c new file mode 100644 index 0000000..462119b --- /dev/null +++ b/src/plugins/domain-control/table.c @@ -0,0 +1,501 @@ +#include +#include + +#include +#include +#include +#include + +#include +#include + +#include "domain-control.h" +#include "table.h" + +#define FAIL(ec, msg) do { \ + *errcode = ec; \ + *errmsg = msg; \ + goto fail; \ + } while (0) + +static pep_table_t *lookup_watch_table(pdp_t *pdp, const char *name); + +#include "table-common.c" + +/* + * proxied and tracked tables + */ + + +static void table_event_cb(mqi_event_t *e, void *user_data) +{ + pdp_t *pdp = (pdp_t *)user_data; + const char *name = e->table.table.name; + mqi_handle_t h = e->table.table.handle; + pep_table_t *t; + + switch (e->event) { + case mqi_table_created: + mrp_debug("table %s (0x%x) created", name, h); + break; + + case mqi_table_dropped: + mrp_debug("table %s (0x%x) dropped", name, h); + break; + default: + return; + } + + t = lookup_watch_table(pdp, name); + + if (t != NULL) { + t->notify_all = TRUE; + t->h = h; + } + + schedule_notification(pdp); +} + + +static void transaction_event_cb(mqi_event_t *e, void *user_data) +{ + pdp_t *pdp = (pdp_t *)user_data; + + switch (e->event) { + case mqi_transaction_end: + mrp_debug("transaction ended"); + schedule_notification(pdp); + break; + + case mqi_transaction_start: + mrp_debug("transaction started"); + break; + + default: + break; + } +} + + +static int open_db(pdp_t *pdp) +{ + if (mqi_open() == 0) { + if (mqi_create_transaction_trigger(transaction_event_cb, pdp) == 0) { + if (mqi_create_table_trigger(table_event_cb, pdp) == 0) + return TRUE; + else + mqi_drop_transaction_trigger(transaction_event_cb, pdp); + } + } + + return FALSE; +} + + +static void close_db(pdp_t *pdp) +{ + mqi_drop_table_trigger(table_event_cb, pdp); + mqi_drop_transaction_trigger(transaction_event_cb, pdp); +} + + +static void purge_watch_table_cb(void *key, void *entry); + + + +int init_tables(pdp_t *pdp) +{ + mrp_htbl_config_t hcfg; + + if (open_db(pdp)) { + mrp_list_init(&pdp->tables); + + mrp_clear(&hcfg); + hcfg.comp = mrp_string_comp; + hcfg.hash = mrp_string_hash; + hcfg.free = purge_watch_table_cb; + + pdp->watched = mrp_htbl_create(&hcfg); + } + + return (pdp->watched != NULL); +} + + +void destroy_tables(pdp_t *pdp) +{ + close_db(pdp); + mrp_htbl_destroy(pdp->watched, TRUE); + + pdp->watched = NULL; +} + + +int exec_mql(mql_result_type_t type, mql_result_t **resultp, + const char *format, ...) +{ + mql_result_t *r; + char buf[4096]; + va_list ap; + int success, n; + + va_start(ap, format); + n = vsnprintf(buf, sizeof(buf), format, ap); + va_end(ap); + + if (n < (int)sizeof(buf)) { + r = mql_exec_string(type, buf); + success = (r == NULL || mql_result_is_success(r)); + + if (resultp != NULL) { + *resultp = r; + return success; + } + else { + mql_result_free(r); + return success; + } + } + else { + errno = EOVERFLOW; + if (resultp != NULL) + *resultp = NULL; + + return FALSE; + } +} + + +static int get_table_description(pep_table_t *t) +{ + mqi_column_def_t columns[MQI_COLUMN_MAX]; + mrp_domctl_value_t *values = NULL; + int ncolumn, i; + + if (t->h == MQI_HANDLE_INVALID) + t->h = mqi_get_table_handle((char *)t->name); + + if (t->h != MQI_HANDLE_INVALID) { + ncolumn = mqi_describe(t->h, columns, MRP_ARRAY_SIZE(columns)); + + if (ncolumn > 0) { + t->columns = mrp_allocz_array(typeof(*t->columns), ncolumn); + t->coldesc = mrp_allocz_array(typeof(*t->coldesc), ncolumn + 1); + + if (t->columns != NULL && t->coldesc != NULL) { + memcpy(t->columns, columns, ncolumn * sizeof(*t->columns)); + t->ncolumn = ncolumn; + + for (i = 0; i < t->ncolumn; i++) { + t->coldesc[i].cindex = i; + t->coldesc[i].offset = (int)(ptrdiff_t)&values[i].str; + } + + t->coldesc[i].cindex = -1; + t->coldesc[i].offset = 0; + + return TRUE; + } + } + } + + return FALSE; +} + + +int create_proxy_table(pep_table_t *t, int *errcode, const char **errmsg) +{ + mrp_list_init(&t->hook); + mrp_list_init(&t->watches); + + if (mqi_get_table_handle((char *)t->name) != MQI_HANDLE_INVALID) + FAIL(EEXIST, "DB error: table already exists"); + + if (exec_mql(mql_result_dontcare, NULL, + "create temporary table %s (%s)", t->name, t->mql_columns)) { + if (t->mql_index && t->mql_index[0]) { + if (!exec_mql(mql_result_dontcare, NULL, + "create index on %s (%s)", t->name, t->mql_index)) + FAIL(EINVAL, "failed to table index"); + } + + if (!get_table_description(t)) + FAIL(EINVAL, "DB error: failed to get table description"); + + return TRUE; + } + else + FAIL(ENOMEM, "DB error: failed to create table"); + + fail: + return FALSE; +} + + +void destroy_proxy_table(pep_table_t *t) +{ + mrp_debug("destroying table %s", t->name ? t->name : ""); + + if (t->h != MQI_HANDLE_INVALID) + mqi_drop_table(t->h); + + mrp_free(t->mql_columns); + mrp_free(t->mql_index); + + mrp_free(t->columns); + mrp_free(t->coldesc); + mrp_free(t->name); + + t->name = NULL; + t->h = MQI_HANDLE_INVALID; + t->columns = NULL; + t->ncolumn = 0; +} + + +void destroy_proxy_tables(pep_proxy_t *proxy) +{ + mqi_handle_t tx; + int i; + + mrp_debug("destroying tables of client %s", proxy->name); + + tx = mqi_begin_transaction(); + for (i = 0; i < proxy->ntable; i++) + destroy_proxy_table(proxy->tables + i); + mqi_commit_transaction(tx); + + proxy->tables = NULL; + proxy->ntable = 0; +} + + +pep_table_t *create_watch_table(pdp_t *pdp, const char *name) +{ + pep_table_t *t; + + t = mrp_allocz(sizeof(*t)); + + if (t != NULL) { + mrp_list_init(&t->hook); + mrp_list_init(&t->watches); + + t->h = MQI_HANDLE_INVALID; + t->name = mrp_strdup(name); + + if (t->name == NULL) + goto fail; + + get_table_description(t); + + if (!mrp_htbl_insert(pdp->watched, t->name, t)) + goto fail; + + mrp_list_append(&pdp->tables, &t->hook); + } + + return t; + + fail: + destroy_watch_table(pdp, t); + + return FALSE; +} + + +static void destroy_table_watches(pep_table_t *t) +{ + pep_watch_t *w; + mrp_list_hook_t *p, *n; + + if (t != NULL) { + mrp_list_foreach(&t->watches, p, n) { + w = mrp_list_entry(p, typeof(*w), tbl_hook); + + mrp_list_delete(&w->tbl_hook); + mrp_list_delete(&w->pep_hook); + + mrp_free(w->mql_columns); + mrp_free(w->mql_where); + mrp_free(w); + } + } +} + + +void destroy_watch_table(pdp_t *pdp, pep_table_t *t) +{ + mrp_list_delete(&t->hook); + t->h = MQI_HANDLE_INVALID; + + if (pdp != NULL) + mrp_htbl_remove(pdp->watched, t->name, FALSE); + + destroy_table_watches(t); +} + + +static pep_table_t *lookup_watch_table(pdp_t *pdp, const char *name) +{ + return mrp_htbl_lookup(pdp->watched, (void *)name); +} + + +static void purge_watch_table_cb(void *key, void *entry) +{ + pep_table_t *t = (pep_table_t *)entry; + + MRP_UNUSED(key); + + destroy_watch_table(NULL, t); +} + + +int create_proxy_watch(pep_proxy_t *proxy, int id, + const char *table, const char *mql_columns, + const char *mql_where, int max_rows, + int *error, const char **errmsg) +{ + pdp_t *pdp = proxy->pdp; + pep_table_t *t; + pep_watch_t *w; + + t = lookup_watch_table(pdp, table); + + if (t == NULL) { + t = create_watch_table(pdp, table); + + if (t == NULL) { + *error = EINVAL; + *errmsg = "failed to watch table"; + } + } + + w = mrp_allocz(sizeof(*w)); + + if (w != NULL) { + mrp_list_init(&w->tbl_hook); + mrp_list_init(&w->pep_hook); + + w->table = t; + w->mql_columns = mrp_strdup(mql_columns); + w->mql_where = mrp_strdup(mql_where ? mql_where : ""); + w->max_rows = max_rows; + w->proxy = proxy; + w->id = id; + + if (w->mql_columns == NULL || w->mql_where == NULL) + goto fail; + + mrp_list_append(&t->watches, &w->tbl_hook); + mrp_list_append(&proxy->watches, &w->pep_hook); + + return TRUE; + } + else { + *error = ENOMEM; + *errmsg = "failed to allocate table watch"; + } + + fail: + if (w != NULL) { + mrp_free(w->mql_columns); + mrp_free(w->mql_where); + mrp_free(w); + } + + return FALSE; +} + + +void destroy_proxy_watches(pep_proxy_t *proxy) +{ + pep_watch_t *w; + mrp_list_hook_t *p, *n; + + if (proxy != NULL) { + mrp_list_foreach(&proxy->watches, p, n) { + w = mrp_list_entry(p, typeof(*w), pep_hook); + + mrp_list_delete(&w->tbl_hook); + mrp_list_delete(&w->pep_hook); + + mrp_free(w); + } + } +} + + +static void reset_proxy_tables(pep_proxy_t *proxy) +{ + int i; + + for (i = 0; i < proxy->ntable; i++) + mqi_delete_from(proxy->tables[i].h, NULL); +} + + +static int insert_into_table(pep_table_t *t, + mrp_domctl_value_t **rows, int nrow) +{ + void *data[2]; + int i; + + data[1] = NULL; + + for (i = 0; i < nrow; i++) { + data[0] = rows[i]; + if (mqi_insert_into(t->h, 0, t->coldesc, data) != 1) + return FALSE; + } + + return TRUE; +} + + +int set_proxy_tables(pep_proxy_t *proxy, mrp_domctl_data_t *tables, int ntable, + int *error, const char **errmsg) +{ + mqi_handle_t tx; + pep_table_t *t; + int i, id; + + tx = mqi_begin_transaction(); + + if (tx != MQI_HANDLE_INVALID) { + reset_proxy_tables(proxy); + + for (i = 0; i < ntable; i++) { + id = tables[i].id; + + if (id < 0 || id >= proxy->ntable) + goto fail; + + t = proxy->tables + id; + + if (tables[i].ncolumn != t->ncolumn) + goto fail; + +#if 0 + if (!delete_from_table(t, tables[i].rows, tables[i].nrow)) + goto fail; +#endif + + if (!insert_into_table(t, tables[i].rows, tables[i].nrow)) + goto fail; + + + } + + mqi_commit_transaction(tx); + + return TRUE; + + fail: + *error = EINVAL; + *errmsg = "failed to set tables"; + mqi_rollback_transaction(tx); + } + + return FALSE; +} diff --git a/src/plugins/domain-control/table.h b/src/plugins/domain-control/table.h new file mode 100644 index 0000000..283928f --- /dev/null +++ b/src/plugins/domain-control/table.h @@ -0,0 +1,33 @@ +#ifndef __MURPHY_DOMAIN_CONTROL_TABLE_H__ +#define __MURPHY_DOMAIN_CONTROL_TABLE_H__ + +#include + +#include "client.h" +#include "domain-control-types.h" + +int init_tables(pdp_t *pdp); +void destroy_tables(pdp_t *pdp); + +int create_proxy_table(pep_table_t *t, int *errcode, const char **errmsg); + +int create_proxy_watch(pep_proxy_t *proxy, int id, + const char *table, const char *mql_columns, + const char *mql_where, int max_rows, + int *error, const char **errmsg); + +void destroy_watch_table(pdp_t *pdp, pep_table_t *t); + +void destroy_proxy_table(pep_table_t *t); +void destroy_proxy_tables(pep_proxy_t *proxy); + +void destroy_proxy_watches(pep_proxy_t *proxy); + +int set_proxy_tables(pep_proxy_t *proxy, mrp_domctl_data_t *tables, int ntable, + int *error, const char **errmsg); + +int exec_mql(mql_result_type_t type, mql_result_t **resultp, + const char *format, ...); + + +#endif /* __MURPHY_DOMAIN_CONTROL_TABLE_H__ */ diff --git a/src/plugins/domain-control/test-client.c b/src/plugins/domain-control/test-client.c new file mode 100644 index 0000000..ac9c377 --- /dev/null +++ b/src/plugins/domain-control/test-client.c @@ -0,0 +1,1135 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define _GNU_SOURCE +#include + +#include +#include + +#include +#include "client.h" + +#define DEFAULT_PROMPT "test-controller> " + + +/* + * client context + */ + +typedef struct { + const char *addrstr; /* server address */ + int zone; /* run in zone control mode */ + int verbose; /* verbose mode */ + mrp_mainloop_t *ml; /* murphy mainloop */ + void *dc; /* domain controller */ + int fd; /* fd for terminal input */ + mrp_io_watch_t *iow; /* I/O watch for terminal input */ +} client_t; + + +#define NVALUE 512 + + +/* + * device and stream definitions + */ + +#define NDEVICE (MRP_ARRAY_SIZE(devices) - 1) +#define DEVICE_NCOLUMN 4 + +typedef struct { + const char *name; + const char *type; + int public; + int available; +} device_t; + +static device_t devices[] = { + { "builtin-speaker" , "speaker" , TRUE , TRUE }, + { "builtin-earpiece", "speaker" , FALSE, TRUE }, + { "usb-speaker" , "speaker" , TRUE , FALSE }, + { "a2dp-speaker" , "speaker" , TRUE , FALSE }, + { "wired-headset" , "headset" , FALSE, FALSE }, + { "usb-headphone" , "headphone", FALSE, FALSE }, + { "a2dp-headphone" , "headphone", FALSE, FALSE }, + { "sco-headset" , "headset" , FALSE, FALSE }, + { NULL , NULL , FALSE, FALSE } +}; + +#define NSTREAM (MRP_ARRAY_SIZE(streams) - 1) +#define STREAM_NCOLUMN 4 + +typedef struct { + const char *name; + const char *role; + pid_t owner; + int playing; +} stream_t; + +static stream_t streams[] = { + { "player1", "player" , 1234, FALSE }, + { "player2", "player" , 4321, FALSE }, + { "navit" , "navigator", 5432, FALSE }, + { "phone" , "call" , 6666, FALSE }, + { NULL , NULL , 0 , FALSE } +}; + + +/* + * device and stream descriptors + */ + +#define DEVICE_COLUMNS \ + "name varchar(32), " \ + "type varchar(32), " \ + "public integer , " \ + "available integer" + +#define DEVICE_INDEX "name" + +#define DEVICE_SELECT "*" + +#define DEVICE_WHERE NULL + +#define STREAM_COLUMNS \ + "name varchar(32)," \ + "role varchar(32)," \ + "owner unsigned ," \ + "playing integer" + +#define STREAM_INDEX "name" + +#define STREAM_SELECT "*" +#define STREAM_WHERE NULL + +mrp_domctl_table_t media_tables[] = { + MRP_DOMCTL_TABLE("devices", DEVICE_COLUMNS, DEVICE_INDEX), + MRP_DOMCTL_TABLE("streams", STREAM_COLUMNS, STREAM_INDEX), +}; + +mrp_domctl_watch_t media_watches[] = { + MRP_DOMCTL_WATCH("devices", DEVICE_SELECT, DEVICE_WHERE, 0), + MRP_DOMCTL_WATCH("streams", STREAM_SELECT, STREAM_WHERE, 0), +}; + + +/* + * zone and call definitions + */ + +#define NZONE (MRP_ARRAY_SIZE(zones) - 1) +#define ZONE_NCOLUMN 3 + +typedef struct { + const char *name; + int occupied; + int active; +} zone_t; + +static zone_t zones[] = { + { "driver" , TRUE , FALSE }, + { "fearer" , FALSE, TRUE }, + { "back-left" , TRUE , FALSE }, + { "back-center", FALSE, FALSE }, + { "back-right" , TRUE , TRUE }, + { NULL , FALSE, FALSE } +}; + + +#define NCALL (MRP_ARRAY_SIZE(calls) - 1) +#define CALL_NCOLUMN 3 + +typedef struct { + int id; + const char *state; + const char *modem; +} call_t; + +static call_t calls[] = { + { 1, "active" , "modem1" }, + { 2, "ringing" , "modem1" }, + { 3, "held" , "modem2" }, + { 4, "alerting", "modem2" }, + { 0, NULL , NULL } +}; + + +/* + * zone and call descriptors + */ + +#define ZONE_COLUMNS \ + "name varchar(32), " \ + "occupied integer , " \ + "active integer" + +#define ZONE_INDEX "name" + +#define ZONE_SELECT "*" + +#define ZONE_WHERE NULL + +#define CALL_COLUMNS \ + "id integer , " \ + "state varchar(32), " \ + "modem varchar(32)" + +#define CALL_INDEX "id" + +#define CALL_SELECT "*" + +#define CALL_WHERE NULL + +mrp_domctl_table_t zone_tables[] = { + MRP_DOMCTL_TABLE("zones", ZONE_COLUMNS, ZONE_INDEX), + MRP_DOMCTL_TABLE("calls", CALL_COLUMNS, CALL_INDEX), +}; + +mrp_domctl_watch_t zone_watches[] = { + MRP_DOMCTL_WATCH("zones", ZONE_SELECT, ZONE_WHERE, 0), + MRP_DOMCTL_WATCH("calls", CALL_SELECT, CALL_WHERE, 0) +}; + +mrp_domctl_table_t *exports; +int nexport; +mrp_domctl_watch_t *imports; +int nimport; + + +static client_t *client; + + +static void fatal_msg(int error, const char *format, ...); +static void error_msg(const char *format, ...); +static void info_msg(const char *format, ...); + +static void terminal_input_cb(char *input); + +static void export_data(client_t *c); + + +static void plug_device(client_t *c, const char *name, int plug) +{ + device_t *d; + int changed; + + if (c->zone) { + error_msg("cannot plug/unplug, client is in zone mode"); + return; + } + + changed = FALSE; + + for (d = devices; d->name != NULL; d++) { + if (!strcmp(d->name, name)) { + changed = plug ^ d->available; + d->available = plug; + break; + } + } + + if (changed) { + info_msg("device '%s' is now %splugged", d->name, plug ? "" : "un"); + export_data(c); + } +} + + +static void list_devices(void) +{ + device_t *d; + int n; + + for (d = devices, n = 0; d->name != NULL; d++, n++) { + info_msg("device '%s': (%s, %s), %s", + d->name, d->type, d->public ? "public" : "private", + d->available ? "available" : "currently unplugged"); + } + + if (n == 0) + info_msg("devices: none"); +} + + +static void play_stream(client_t *c, const char *name, int play) +{ + stream_t *s; + int changed; + + if (c->zone) { + error_msg("cannot control streams, client is in zone mode"); + return; + } + + changed = FALSE; + + for (s = streams; s->name != NULL; s++) { + if (!strcmp(s->name, name)) { + changed = play ^ s->playing; + s->playing = play; + break; + } + } + + if (changed) { + info_msg("stream '%s' is now %s", s->name, play ? "playing":"stopped"); + export_data(c); + } +} + + +static void list_streams(void) +{ + stream_t *s; + int n; + + for (s = streams, n = 0; s->name != NULL; s++, n++) { + info_msg("stream '%s': role %s, owner %u, currently %splaying", + s->name, s->role, s->owner, s->playing ? "" : "not "); + } + + if (n == 0) + info_msg("streams: none"); +} + + +static void set_zone_state(client_t *c, char *config) +{ + zone_t *z; + int occupied, active, changed, len; + char name[256], *end; + + if (!c->zone) { + error_msg("cannot control zones, client is not in zone mode"); + return; + } + + while (*config == ' ' || *config == '\t') + config++; + + end = strchr(config, ' '); + if (end == NULL) + return; + + len = end - config; + strncpy(name, config, len); + name[len] = '\0'; + + config = end + 1; + while (*config == ' ' || *config == '\t') + config++; + + occupied = FALSE; + active = FALSE; + changed = FALSE; + + if (strstr(config, "occupied")) + occupied = TRUE; + if (strstr(config, "active")) + active = TRUE; + + for (z = zones; z->name != NULL; z++) { + if (!strcmp(z->name, name)) { + changed = (active ^ z->active) | (occupied ^ z->occupied); + z->active = active; + z->occupied = occupied; + break; + } + } + + if (changed) { + info_msg("zone '%s' is now %s and %s", z->name, + z->occupied ? "occupied" : "free", + z->active ? "active" : "idle"); + export_data(c); + } +} + + +static void list_zones(void) +{ + zone_t *z; + int n; + + for (z = zones, n = 0; z->name != NULL; z++, n++) { + info_msg("zone '%s' is now %s and %s", z->name, + z->occupied ? "occupied" : "free", + z->active ? "active" : "idle"); + } + + if (n == 0) + info_msg("zones: none"); +} + + +static void set_call_state(client_t *c, const char *config) +{ + call_t *call; + char idstr[64], *state, *end; + int id, changed, len; + + if (!c->zone) { + error_msg("cannot control calls, client is not in zone mode"); + return; + } + + while (*config == ' ' || *config == '\t') + config++; + + end = strchr(config, ' '); + if (end == NULL) + return; + + len = end - config; + strncpy(idstr, config, len); + idstr[len] = '\0'; + + config = end + 1; + while (*config == ' ' || *config == '\t') + config++; + state = (char *)config; + + id = strtoul(idstr, &end, 10); + + if (end && *end) { + error_msg("invalid call id '%s'", idstr); + return; + } + + for (call = calls; call->id > 0; call++) { + if (call->id == id) { + if (strcmp(call->state, state)) { + mrp_free((char *)call->state); + call->state = mrp_strdup(state); + changed = TRUE; + break; + } + } + } + + if (changed) { + info_msg("call #%d is now %s", call->id, call->state); + export_data(c); + } +} + + +static void list_calls(void) +{ + call_t *c; + int n; + + for (c = calls, n = 0; c->id > 0; c++, n++) { + info_msg("call #%d: %s (on modem %s)", c->id, c->state, c->modem); + } + + if (n == 0) + info_msg("calls: none"); +} + + +static void reset_devices(void) +{ + mrp_clear(&devices); +} + + +void update_devices(mrp_domctl_data_t *data) +{ + device_t *d; + mrp_domctl_value_t *v; + int i; + + if (data->nrow != 0 && data->ncolumn != DEVICE_NCOLUMN) { + error_msg("incorrect number of columns (%d) in device update", + data->ncolumn); + return; + } + + if (data->nrow > (int)NDEVICE) { + error_msg("too many rows (%d) in device update", data->nrow); + return; + } + + if (data->nrow == 0) + reset_devices(); + else { + d = devices; + + for (i = 0; i < data->nrow; i++) { + mrp_free((char *)d->name); + mrp_free((char *)d->type); + + v = data->rows[i]; + d->name = mrp_strdup(v[0].str); + d->type = mrp_strdup(v[1].str); + d->public = v[2].s32; + d->available = v[3].s32; + + d += 1; + } + } + + list_devices(); +} + + +static void reset_streams(void) +{ + mrp_clear(&streams); +} + + +void update_streams(mrp_domctl_data_t *data) +{ + stream_t *s; + mrp_domctl_value_t *v; + int i; + + if (data->nrow != 0 && data->ncolumn != STREAM_NCOLUMN) { + error_msg("incorrect number of columns (%d) in stream update", + data->ncolumn); + return; + } + + if (data->nrow > (int)NSTREAM) { + error_msg("too many rows (%d) in stream update", data->nrow); + return; + } + + if (data->nrow == 0) + reset_streams(); + else { + s = streams; + + for (i = 0; i < data->nrow; i++) { + mrp_free((char *)s->name); + mrp_free((char *)s->role); + + v = data->rows[i]; + s->name = mrp_strdup(v[0].str); + s->role = mrp_strdup(v[1].str); + s->owner = v[2].u32; + s->playing = v[3].s32; + + s += 1; + } + } + + list_streams(); +} + + +static void reset_zones(void) +{ + mrp_clear(&zones); +} + + +void update_zones(mrp_domctl_data_t *data) +{ + zone_t *z; + mrp_domctl_value_t *v; + int i; + + if (data->nrow != 0 && data->ncolumn != ZONE_NCOLUMN) { + error_msg("incorrect number of columns (%d) in zone update", + data->ncolumn); + return; + } + + if (data->nrow > (int)NZONE) { + error_msg("too many rows (%d) in zone update", data->nrow); + return; + } + + if (data->nrow == 0) + reset_zones(); + else { + z = zones; + + for (i = 0; i < data->nrow; i++) { + mrp_free((char *)z->name); + + v = data->rows[i]; + z->name = mrp_strdup(v[0].str); + z->occupied = v[1].s32; + z->active = v[2].s32; + + z += 1; + } + } + + list_zones(); +} + + +static void reset_calls(void) +{ + mrp_clear(&calls); +} + + +void update_calls(mrp_domctl_data_t *data) +{ + call_t *c; + mrp_domctl_value_t *v; + int i; + + if (data->nrow != 0 && data->ncolumn != CALL_NCOLUMN) { + error_msg("incorrect number of columns (%d) in call update.", + data->ncolumn); + return; + } + + if (data->nrow > (int)NCALL) { + error_msg("too many rows (%d) in call update", data->nrow); + return; + } + + if (data->nrow == 0) + reset_calls(); + else { + c = calls; + + for (i = 0; i < data->nrow; i++) { + mrp_free((char *)c->state); + mrp_free((char *)c->modem); + + v = data->rows[i]; + c->id = v[0].s32; + c->state = mrp_strdup(v[1].str); + c->modem = mrp_strdup(v[2].str); + + c += 1; + } + } + + list_calls(); +} + + +void update_imports(client_t *c, mrp_domctl_data_t *data, int ntable) +{ + int i; + + for (i = 0; i < ntable; i++) { + if (c->zone) { + if (data[i].id == 0) + update_devices(data + i); + else + update_streams(data + i); + } + else { + if (data[i].id == 0) + update_zones(data + i); + else + update_calls(data + i); + } + } +} + + + +static void terminal_prompt_erase(void) +{ + int n = strlen(DEFAULT_PROMPT); + + printf("\r"); + while (n-- > 0) + printf(" "); + printf("\r"); +} + + +static void terminal_prompt_display(void) +{ + rl_callback_handler_remove(); + rl_callback_handler_install(DEFAULT_PROMPT, terminal_input_cb); +} + + +static void show_help(void) +{ +#define P info_msg + + P("Available commands:"); + P(" help show this help"); + P(" list list all data"); + P(" list {devices|streams|zones|calls} list the requested data"); + P(" plug update as plugged"); + P(" unplug update as unplugged"); + P(" play update as playing"); + P(" stop update as stopped"); + P(" call update state of "); + P(" zone [occupied,[active]] update state of "); + +#undef P +} + + +static void terminal_process_input(char *input) +{ + int len; + + add_history(input); + + if (input == NULL || !strcmp(input, "exit")) { + terminal_prompt_erase(); + exit(0); + } + else if (!strcmp(input, "help")) { + show_help(); + } + else if (!strcmp(input, "list")) { + list_devices(); + list_streams(); + list_zones(); + list_calls(); + } + else if (!strcmp(input, "list devices")) + list_devices(); + else if (!strcmp(input, "list streams")) + list_streams(); + else if (!strcmp(input, "list zones")) + list_zones(); + else if (!strcmp(input, "list calls")) + list_calls(); + else if (!strncmp(input, "plug " , len=sizeof("plug ") - 1) || + !strncmp(input, "unplug ", len=sizeof("unplug ") - 1)) { + plug_device(client, input + len, *input == 'p'); + } + else if (!strncmp(input, "play " , len=sizeof("play ") - 1) || + !strncmp(input, "stop ", len=sizeof("stop ") - 1)) { + play_stream(client, input + len, *input == 'p'); + } + else if (!strncmp(input, "call " , len=sizeof("call ") - 1)) { + set_call_state(client, input + len); + } + else if (!strncmp(input, "zone " , len=sizeof("zone ") - 1)) { + set_zone_state(client, input + len); + } +} + + +static void terminal_input_cb(char *input) +{ + terminal_process_input(input); + free(input); +} + + +static void terminal_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd, + mrp_io_event_t events, void *user_data) +{ + MRP_UNUSED(w); + MRP_UNUSED(fd); + MRP_UNUSED(user_data); + + if (events & MRP_IO_EVENT_IN) + rl_callback_read_char(); + + if (events & MRP_IO_EVENT_HUP) + mrp_mainloop_quit(ml, 0); +} + + +static void terminal_setup(client_t *c) +{ + mrp_io_event_t events; + + c->fd = fileno(stdin); + events = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP; + c->iow = mrp_add_io_watch(c->ml, c->fd, events, terminal_cb, c); + + if (c->iow == NULL) + fatal_msg(1, "Failed to create terminal input I/O watch."); + else + terminal_prompt_display(); +} + + +static void terminal_cleanup(client_t *c) +{ + mrp_del_io_watch(c->iow); + c->iow = NULL; + + rl_callback_handler_remove(); +} + + +static void fatal_msg(int error, const char *format, ...) +{ + va_list ap; + + terminal_prompt_erase(); + + fprintf(stderr, "fatal error: "); + va_start(ap, format); + vfprintf(stderr, format, ap); + va_end(ap); + fprintf(stderr, "\n"); + fflush(stderr); + + exit(error); +} + + +static void error_msg(const char *format, ...) +{ + va_list ap; + + terminal_prompt_erase(); + + fprintf(stderr, "error: "); + va_start(ap, format); + vfprintf(stderr, format, ap); + va_end(ap); + fprintf(stderr, "\n"); + fflush(stderr); + + terminal_prompt_display(); +} + + +static void info_msg(const char *format, ...) +{ + va_list ap; + + terminal_prompt_erase(); + + va_start(ap, format); + vfprintf(stdout, format, ap); + va_end(ap); + fprintf(stdout, "\n"); + fflush(stdout); + + terminal_prompt_display(); +} + + +static void signal_handler(mrp_mainloop_t *ml, mrp_sighandler_t *h, + int signum, void *user_data) +{ + MRP_UNUSED(h); + MRP_UNUSED(user_data); + + switch (signum) { + case SIGINT: + info_msg("Got SIGINT, stopping..."); + mrp_mainloop_quit(ml, 0); + break; + } +} + + +static void connect_notify(mrp_domctl_t *dc, int connected, int errcode, + const char *errmsg, void *user_data) +{ + MRP_UNUSED(dc); + MRP_UNUSED(user_data); + + if (connected) { + info_msg("Successfully registered to server."); + export_data(client); + } + else + error_msg("No connection to server (%d: %s).", errcode, errmsg); +} + + +static void data_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables, + int ntable, void *user_data) +{ + client_t *client = (client_t *)user_data; + + MRP_UNUSED(dc); + + update_imports(client, tables, ntable); +} + + +static void export_notify(mrp_domctl_t *dc, int errcode, const char *errmsg, + void *user_data) +{ + MRP_UNUSED(dc); + MRP_UNUSED(user_data); + + if (errcode != 0) { + error_msg("Data set request failed (%d: %s).", errcode, errmsg); + } + else + info_msg("Sucessfully set data."); +} + + +static void export_data(client_t *c) +{ + mrp_domctl_data_t *tables; + int ntable = 2; + mrp_domctl_value_t *values, *v; + int i; + + tables = alloca(sizeof(*tables) * ntable); + values = alloca(sizeof(*values) * NVALUE); + v = values; + + if (!c->zone) { + tables[0].id = 0; + tables[0].ncolumn = 4; + tables[0].nrow = NDEVICE; + tables[0].rows = alloca(sizeof(*tables[0].rows) * tables[0].nrow); + + for (i = 0; i < (int)NDEVICE; i++) { + tables[0].rows[i] = v; + v[0].type = MRP_DOMCTL_STRING ; v[0].str = devices[i].name; + v[1].type = MRP_DOMCTL_STRING ; v[1].str = devices[i].type; + v[2].type = MRP_DOMCTL_INTEGER; v[2].s32 = devices[i].public; + v[3].type = MRP_DOMCTL_INTEGER; v[3].s32 = devices[i].available; + v += 4; + } + + tables[1].id = 1; + tables[1].ncolumn = 4; + tables[1].nrow = NSTREAM; + tables[1].rows = alloca(sizeof(*tables[1].rows) * tables[1].nrow); + + for (i = 0; i < (int)NSTREAM; i++) { + tables[1].rows[i] = v; + v[0].type = MRP_DOMCTL_STRING ; v[0].str = streams[i].name; + v[1].type = MRP_DOMCTL_STRING ; v[1].str = streams[i].role; + v[2].type = MRP_DOMCTL_UNSIGNED; v[2].s32 = streams[i].owner; + v[3].type = MRP_DOMCTL_INTEGER ; v[3].u32 = streams[i].playing; + v += 4; + } + } + else { + tables[0].id = 0; + tables[0].ncolumn = 3; + tables[0].nrow = NZONE; + tables[0].rows = alloca(sizeof(*tables[0].rows) * tables[0].nrow); + + for (i = 0; i < (int)NZONE; i++) { + tables[0].rows[i] = v; + v[0].type = MRP_DOMCTL_STRING ; v[0].str = zones[i].name; + v[1].type = MRP_DOMCTL_INTEGER; v[1].s32 = zones[i].occupied; + v[2].type = MRP_DOMCTL_INTEGER; v[2].s32 = zones[i].active; + v += 3; + } + + tables[1].id = 1; + tables[1].ncolumn = 3; + tables[1].nrow = NCALL; + tables[1].rows = alloca(sizeof(*tables[0].rows) * tables[1].nrow); + + for (i = 0; i < (int)NCALL; i++) { + tables[1].rows[i] = v; + v[0].type = MRP_DOMCTL_INTEGER; v[0].s32 = calls[i].id; + v[1].type = MRP_DOMCTL_STRING ; v[1].str = calls[i].state; + v[2].type = MRP_DOMCTL_STRING ; v[2].str = calls[i].modem; + v += 3; + } + } + + if (!mrp_domctl_set_data(c->dc, tables, ntable, export_notify, c)) + error_msg("Failed to send data set request to server."); +} + + +static void client_setup(client_t *c) +{ + mrp_mainloop_t *ml; + mrp_domctl_t *dc; + + ml = mrp_mainloop_create(); + + if (ml != NULL) { + if (!c->zone) { + exports = media_tables; + nexport = MRP_ARRAY_SIZE(media_tables); + imports = zone_watches; + nimport = MRP_ARRAY_SIZE(zone_watches); + } + else { + exports = zone_tables; + nexport = MRP_ARRAY_SIZE(zone_tables); + imports = media_watches; + nimport = MRP_ARRAY_SIZE(media_watches); + } + + dc = mrp_domctl_create(c->zone ? "zone-ctrl" : "media-ctrl", ml, + exports, nexport, imports, nimport, + connect_notify, data_notify, c); + + if (dc != NULL) { + c->ml = ml; + c->dc = dc; + + mrp_add_sighandler(ml, SIGINT, signal_handler, c); + + if (c->zone) { + zone_t *z; + call_t *call; + + for (z = zones; z->name != NULL; z++) { + z->name = mrp_strdup(z->name); + } + + for (call = calls; call->id > 0; call++) { + call->state = mrp_strdup(call->state); + call->modem = mrp_strdup(call->modem); + } + + reset_devices(); + reset_streams(); + } + else { + device_t *d; + stream_t *s; + + for (d = devices; d->name != NULL; d++) { + d->name = mrp_strdup(d->name); + d->type = mrp_strdup(d->type); + } + + for (s = streams; s->name != NULL; s++) { + s->name = mrp_strdup(s->name); + s->role = mrp_strdup(s->role); + } + + reset_zones(); + reset_calls(); + } + } + else + fatal_msg(1, "Failed to create enforcement point."); + } + else + fatal_msg(1, "Failed to create mainloop."); +} + + +static void client_cleanup(client_t *c) +{ + mrp_mainloop_destroy(c->ml); + mrp_domctl_destroy(c->dc); + + c->ml = NULL; + c->dc = NULL; +} + + +static void client_run(client_t *c) +{ + if (mrp_domctl_connect(c->dc, c->addrstr)) + info_msg("Connected to server at %s.", c->addrstr); + else + error_msg("Failed to connect to server at %s.", c->addrstr); + + mrp_mainloop_run(c->ml); +} + + +static void print_usage(const char *argv0, int exit_code, const char *fmt, ...) +{ + va_list ap; + + if (fmt && *fmt) { + va_start(ap, fmt); + vprintf(fmt, ap); + va_end(ap); + } + + printf("usage: %s [options]\n\n" + "The possible options are:\n" + " -s, --server
connect to murphy at given address\n" + " -z, --zone run as zone controller\n" + " -v, --verbose run in verbose mode\n" + " -h, --help show this help on usage\n", + argv0); + + if (exit_code < 0) + return; + else + exit(exit_code); +} + + +static void client_set_defaults(client_t *c) +{ + mrp_clear(c); + c->addrstr = MRP_DEFAULT_DOMCTL_ADDRESS; + c->zone = FALSE; + c->verbose = FALSE; +} + + +int parse_cmdline(client_t *c, int argc, char **argv) +{ +# define OPTIONS "vzhs:" + struct option options[] = { + { "server" , required_argument, NULL, 's' }, + { "zone" , no_argument , NULL, 'z' }, + { "verbose" , optional_argument, NULL, 'v' }, + { "help" , no_argument , NULL, 'h' }, + { NULL, 0, NULL, 0 } + }; + + int opt; + + client_set_defaults(c); + + while ((opt = getopt_long(argc, argv, OPTIONS, options, NULL)) != -1) { + switch (opt) { + case 'z': + c->zone = TRUE; + break; + + case 'v': + c->verbose = TRUE; + break; + + case 'a': + c->addrstr = optarg; + break; + + case 'h': + print_usage(argv[0], -1, ""); + exit(0); + break; + + default: + print_usage(argv[0], EINVAL, "invalid option '%c'", opt); + } + } + + return TRUE; +} + + +int main(int argc, char *argv[]) +{ + client_t c; + + client_set_defaults(&c); + parse_cmdline(&c, argc, argv); + + client_setup(&c); + terminal_setup(&c); + + client = &c; + client_run(&c); + + terminal_cleanup(&c); + client_cleanup(&c); + + return 0; +} -- 2.7.4