From 597fe59db3f2ff5c0ad75e331cf91d46a2e318f9 Mon Sep 17 00:00:00 2001 From: Ismo Puustinen Date: Fri, 29 Jun 2012 14:21:51 +0300 Subject: [PATCH] signalling: add an initial signalling plugin. --- configure.ac | 27 ++- src/Makefile.am | 39 +++- src/plugins/plugin-signalling.c | 122 ++++++++++ src/plugins/signalling/client.c | 333 +++++++++++++++++++++++++++ src/plugins/signalling/client.h | 33 +++ src/plugins/signalling/plugin.h | 27 +++ src/plugins/signalling/signalling-protocol.h | 56 +++++ src/plugins/signalling/signalling.h | 61 +++++ src/plugins/signalling/transaction.c | 263 +++++++++++++++++++++ src/plugins/signalling/transaction.h | 53 +++++ src/plugins/signalling/util.c | 45 ++++ src/plugins/signalling/util.h | 15 ++ src/plugins/tests/Makefile.am | 8 + src/plugins/tests/signalling-client.c | 158 +++++++++++++ 14 files changed, 1227 insertions(+), 13 deletions(-) create mode 100644 src/plugins/plugin-signalling.c create mode 100644 src/plugins/signalling/client.c create mode 100644 src/plugins/signalling/client.h create mode 100644 src/plugins/signalling/plugin.h create mode 100644 src/plugins/signalling/signalling-protocol.h create mode 100644 src/plugins/signalling/signalling.h create mode 100644 src/plugins/signalling/transaction.c create mode 100644 src/plugins/signalling/transaction.h create mode 100644 src/plugins/signalling/util.c create mode 100644 src/plugins/signalling/util.h create mode 100644 src/plugins/tests/Makefile.am create mode 100644 src/plugins/tests/signalling-client.c diff --git a/configure.ac b/configure.ac index b3a27d9..218c803 100644 --- a/configure.ac +++ b/configure.ac @@ -292,11 +292,13 @@ AM_CONDITIONAL(DISABLED_PLUGIN_TEST, [check_if_disabled test]) AM_CONDITIONAL(DISABLED_PLUGIN_DBUS, [check_if_disabled dbus]) AM_CONDITIONAL(DISABLED_PLUGIN_GLIB, [check_if_disabled glib]) AM_CONDITIONAL(DISABLED_PLUGIN_CONSOLE, [check_if_disabled console]) +AM_CONDITIONAL(DISABLED_PLUGIN_SIGNALLING, [check_if_disabled signalling]) AM_CONDITIONAL(BUILTIN_PLUGIN_TEST, [check_if_internal test]) AM_CONDITIONAL(BUILTIN_PLUGIN_DBUS, [check_if_internal dbus]) AM_CONDITIONAL(BUILTIN_PLUGIN_GLIB, [check_if_internal glib]) AM_CONDITIONAL(BUILTIN_PLUGIN_CONSOLE, [check_if_internal console]) +AM_CONDITIONAL(BUILTIN_PLUGIN_SIGNALLING, [check_if_internal signalling]) # Check for Check (unit test framework). PKG_CHECK_MODULES(CHECK, @@ -380,25 +382,26 @@ AC_CONFIG_FILES([build-aux/shave build-aux/shave-libtool Makefile src/Makefile + src/common/tests/Makefile + src/core/tests/Makefile + src/daemon/tests/Makefile + src/plugins/tests/Makefile src/common/murphy-common.pc src/common/murphy-dbus.pc src/common/murphy-pulse.pc - src/common/tests/Makefile src/core/murphy-core.pc - src/core/tests/Makefile - src/murphy-db/Makefile - src/murphy-db/mdb/Makefile - src/murphy-db/mqi/Makefile - src/murphy-db/mql/Makefile - src/murphy-db/include/Makefile - src/murphy-db/tests/Makefile - src/daemon/tests/Makefile + src/murphy-db/Makefile + src/murphy-db/mdb/Makefile + src/murphy-db/mqi/Makefile + src/murphy-db/mql/Makefile + src/murphy-db/include/Makefile + src/murphy-db/tests/Makefile src/resolver/murphy-resolver.pc src/resolver/tests/Makefile doc/Makefile - doc/plugin-developer-guide/Makefile - doc/plugin-developer-guide/db/Makefile - doc/plugin-developer-guide/doxml/Makefile + doc/plugin-developer-guide/Makefile + doc/plugin-developer-guide/db/Makefile + doc/plugin-developer-guide/doxml/Makefile ]) AC_OUTPUT diff --git a/src/Makefile.am b/src/Makefile.am index ce030c6..719dba0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,5 +1,5 @@ SUBDIRS = murphy-db . \ - common/tests core/tests daemon/tests resolver/tests + common/tests core/tests daemon/tests resolver/tests plugins/tests AM_CFLAGS = $(WARNING_CFLAGS) -I$(top_builddir) -DLIBDIR=\"@LIBDIR@\" MURPHY_CFLAGS = pkgconfigdir = ${libdir}/pkgconfig @@ -460,6 +460,43 @@ clean-func-infos:: -rm plugin-console-func-info.c endif +# signalling plugin +SIGNALLING_PLUGIN_SOURCES = plugins/plugin-signalling.c \ + plugins/signalling/transaction.c \ + plugins/signalling/util.c \ + plugins/signalling/client.c \ + plugins/signalling/plugin.h \ + plugins/signalling/transaction.h \ + plugins/signalling/util.h \ + plugins/signalling/client.h +SIGNALLING_PLUGIN_HEADERS = plugins/signalling/signalling.h +SIGNALLING_PLUGIN_CFLAGS = +SIGNALLING_PLUGIN_LIBS = +SIGNALLING_PLUGINdir = $(includedir)/murphy/plugins/signalling + + +if BUILTIN_PLUGIN_SIGNALLING +BUILTIN_PLUGINS += $(SIGNALLING_PLUGIN_SOURCES) +BUILTIN_CFLAGS += $(SIGNALLING_PLUGIN_CFLAGS) +BUILTIN_LIBS += $(SIGNALLING_PLUGIN_LIBS) +else +plugin_signalling_la_SOURCES = $(SIGNALLING_PLUGIN_SOURCES) +plugin_signalling_la_HEADERS = $(SIGNALLING_PLUGIN_HEADERS) +plugin_signalling_la_CFLAGS = $(SIGNALLING_PLUGIN_CFLAGS) \ + $(MURPHY_CFLAGS) $(AM_CFLAGS) +plugin_signalling_la_LDFLAGS = -module -Wl,-version-script=linker-script.plugin-signalling \ + -version-info @MURPHY_VERSION_INFO@ +plugin_signalling_la_LIBADD = $(SIGNALLING_PLUGIN_LIBS) +plugin_signalling_ladir = $(SIGNALLING_PLUGINdir) + +plugin_LTLIBRARIES += plugin-signalling.la +endif + +# linker script generation +linker-script.plugin-signalling: $(plugin_signalling_la_HEADERS) + $(QUIET_GEN)$(top_builddir)/build-aux/gen-linker-script -q -o $@ $^ + +plugin_signalling_la_DEPENDENCIES = linker-script.plugin-signalling ################################### # murphy daemon diff --git a/src/plugins/plugin-signalling.c b/src/plugins/plugin-signalling.c new file mode 100644 index 0000000..2fa2246 --- /dev/null +++ b/src/plugins/plugin-signalling.c @@ -0,0 +1,122 @@ +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "signalling/plugin.h" +#include "signalling/transaction.h" +#include "signalling/client.h" +#include "signalling/util.h" + +mrp_plugin_t *signalling_plugin; + +enum { + ARG_ADDRESS /* signalling socket address, 'address:port' */ +}; + + +static void htbl_free_client(void *key, void *object) +{ + MRP_UNUSED(key); + free_client(object); +} + + +static void htbl_free_transaction(void *key, void *object) +{ + MRP_UNUSED(key); + free_transaction(object); +} + + +static int signalling_init(mrp_plugin_t *plugin) +{ + data_t *data; + mrp_htbl_config_t client_conf, tx_conf; + + signalling_info("> init()"); + + if ((data = mrp_allocz(sizeof(*data))) == NULL) + return FALSE; + + data->ctx = plugin->ctx; + data->address = plugin->args[ARG_ADDRESS].str; + + type_init(); + + client_conf.comp = mrp_string_comp; + client_conf.hash = mrp_string_hash; + client_conf.free = htbl_free_client; + client_conf.nbucket = 0; + client_conf.nentry = 10; + + data->clients = mrp_htbl_create(&client_conf); + + tx_conf.comp = int_comp; + tx_conf.hash = int_hash; + tx_conf.free = htbl_free_transaction; + tx_conf.nbucket = 0; + tx_conf.nentry = 5; + + data->txs = mrp_htbl_create(&tx_conf); + + /* we only support unix domain sockets for the time being */ + + if (!strncmp(data->address, "unxs:", 5)) { + data->path = data->address + 5; + if (socket_setup(data)) { + plugin->data = data; + signalling_info("set up at address '%s'.", data->address); + + signalling_plugin = plugin; + + return TRUE; + } + } + + mrp_free(data); + + signalling_error("failed to set up signalling at address '%s'.", + plugin->args[ARG_ADDRESS].str); + + return FALSE; +} + + +static void signalling_exit(mrp_plugin_t *plugin) +{ + data_t *ctx = plugin->data; + + signalling_info("cleaning up instance '%s'...", plugin->instance); + + /* FIXME: call error callbacks of all active transactions? */ + + mrp_htbl_destroy(ctx->clients, TRUE); + mrp_htbl_destroy(ctx->txs, TRUE); +} + + +#define SIGNALLING_DESCRIPTION "A decision signalling plugin for Murphy." +#define SIGNALLING_HELP \ + "The signalling plugin provides one-to-many communication from Murphy\n" \ + "to enforcement points. The enforcement points are supposed to use\n" \ + "libsignalling to initialize connection to Murphy and receive events\n" \ + "from it." + +#define SIGNALLING_VERSION MRP_VERSION_INT(0, 0, 1) +#define SIGNALLING_AUTHORS "Ismo Puustinen " + + +static mrp_plugin_arg_t signalling_args[] = { + MRP_PLUGIN_ARGIDX(ARG_ADDRESS, STRING, "address", "unxs:/tmp/murphy/signalling"), +}; + +MURPHY_REGISTER_CORE_PLUGIN("signalling", + SIGNALLING_VERSION, SIGNALLING_DESCRIPTION, + SIGNALLING_AUTHORS, SIGNALLING_HELP, MRP_SINGLETON, + signalling_init, signalling_exit, signalling_args, NULL); diff --git a/src/plugins/signalling/client.c b/src/plugins/signalling/client.c new file mode 100644 index 0000000..132a331 --- /dev/null +++ b/src/plugins/signalling/client.c @@ -0,0 +1,333 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "client.h" +#include "transaction.h" +#include "util.h" + + +void free_client(client_t *c) +{ + uint i; + + mrp_free(c->name); + + for (i = 0; i < c->ndomains; i++) { + mrp_free(c->domains[i]); + } + mrp_free(c->domains); + + mrp_free(c); +} + + +static void remove_client_from_transactions(client_t *c, data_t *ctx) { + /* TODO: Go through all transactions. Remove the client from the + not-answered lists. Re-allocate the client name to the other lists? */ + + MRP_UNUSED(c); + MRP_UNUSED(ctx); + + return; +} + + +void deregister_and_free_client(client_t *c, data_t *ctx) +{ + remove_client_from_transactions(c, ctx); + mrp_htbl_remove(ctx->clients, c->name, 0); + free_client(c); +} + + +int send_policy_decision(data_t *ctx, client_t *c, transaction_t *tx) +{ + ep_decision_t msg; + + MRP_UNUSED(ctx); + + msg.id = tx->id; + msg.n_rows = tx->data.n_rows; + msg.rows = tx->data.rows; + + if (tx->data.success_cb || tx->data.error_cb) { + signalling_info("Reply required for transaction %u", tx->id); + msg.reply_required = TRUE; + } + else + msg.reply_required = FALSE; + + return mrp_transport_senddata(c->t, &msg, TAG_POLICY_DECISION); +} + + +static int handle_ack(client_t *c, data_t *ctx, ep_ack_t *data) +{ + signalling_info("register message"); + + transaction_t *tx = get_transaction(ctx, data->id); + + if (!tx) { + signalling_warn("no transaction with %d found, maybe already done", + data->id); + return 0; + } + + switch(data->success) { + case EP_ACK: + { + uint i, found = 0; + /* go through the not_answered array */ + for (i = 0; i < tx->n_total; i++) { + if (strcmp(c->name, tx->not_answered[i]) == 0) { + found = 1; + tx->acked[tx->n_acked++] = tx->not_answered[i]; + tx->n_not_answered--; + } + } + if (!found) { + signalling_warn("spurious ACK from %s, ignoring", c->name); + return 0; + } + + break; + } + case EP_NACK: + case EP_NOT_READY: + { + uint i, found = 0; + /* go through the not_answered array */ + for (i = 0; i < tx->n_total; i++) { + if (strcmp(c->name, tx->not_answered[i]) == 0) { + found = 1; + tx->nacked[tx->n_nacked++] = tx->not_answered[i]; + tx->n_not_answered--; + + /* FIXME: handle error here or wait for all EPs to answer? */ + } + } + if (!found) { + signalling_error("spurious NACK from %s", c->name); + return 0; + } + + break; + } + default: + signalling_error("unhandled ACK status!"); + return -1; + } + + if (tx->n_not_answered == 0) { + complete_transaction(ctx, tx); + } + + return 0; +} + + +static int handle_register(client_t *c, data_t *ctx, ep_register_t *data) +{ + uint i; + + signalling_info("register message"); + + signalling_info("ep name: %s", data->ep_name); + signalling_info("number of domains: %d", data->n_domains); + + if (strcmp(data->ep_name, "") == 0) { + signalling_error("EP with an empty name"); + + /* TODO: send an error message back */ + return -1; + } + + if (mrp_htbl_lookup(ctx->clients, data->ep_name)) { + /* there already was a client of similar name */ + signalling_error("EP '%s' already exists in db", data->ep_name); + + /* TODO: send an error message back */ + return -1; + } + + c->name = mrp_strdup(data->ep_name); + c->ndomains = data->n_domains; + c->domains = mrp_alloc_array(char *, data->n_domains); + + for (i = 0; i < data->n_domains; i++) { + c->domains[i] = mrp_strdup(data->domains[i]); + signalling_info("domain: %s", data->domains[i]); + } + + c->registered = TRUE; + mrp_htbl_insert(ctx->clients, c->name, c); + ctx->n_clients++; + + return 0; +} + + +static void recvfrom_evt(mrp_transport_t *t, void *data, uint16_t tag, + mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data) +{ + client_t *c = (client_t *)user_data; + int ret = -1; + + MRP_UNUSED(t); + MRP_UNUSED(addr); + MRP_UNUSED(addrlen); + + signalling_info("Received message (%d)", tag); + + switch(tag) { + + case TAG_REGISTER: + ret = handle_register(c, c->u, data); + break; + case TAG_ACK: + ret = handle_ack(c, c->u, data); + break; + case TAG_UNREGISTER: + break; + default: + signalling_warn("Unhandled message type"); + ret = 0; + break; + } + + if (ret < 0) { + signalling_error("Malformed message"); + } +} + + +static void recv_evt(mrp_transport_t *t, void *data, uint16_t tag, void *user_data) +{ + recvfrom_evt(t, data, tag, NULL, 0, user_data); +} + + +static void closed_evt(mrp_transport_t *t, int error, void *user_data) +{ + client_t *c = (client_t *)user_data; + data_t *ctx = c->u; + + if (error) + mrp_log_error("Connection closed with error %d (%s).", error, + strerror(error)); + else { + mrp_log_info("Peer has closed the connection."); + + mrp_transport_disconnect(t); + mrp_transport_destroy(t); + c->t = NULL; + + if (c->registered) + deregister_and_free_client(c, ctx); + } +} + + +static void connection_evt(mrp_transport_t *lt, void *user_data) +{ + data_t *ctx = (data_t *) user_data; + int flags; + client_t *c; + + signalling_info("Connection from peer."); + + if ((c = mrp_allocz(sizeof(*c))) != NULL) { + flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK; + c->t = mrp_transport_accept(lt, c, flags); + c->u = ctx; + c->registered = FALSE; + + if (c->t != NULL) { + signalling_info("Connection accepted."); + /* TODO: maybe remove the client if no registration in some time */ + return; + } + + mrp_transport_destroy(c->t); + mrp_free(c); + } +} + + +int socket_setup(data_t *data) +{ + static mrp_transport_evt_t evt; /* static members are initialized to zero */ + + evt.connection = connection_evt; + evt.closed = closed_evt; + evt.recvdatafrom = recvfrom_evt; + evt.recvdata = recv_evt; + + mrp_transport_t *t = NULL; + mrp_sockaddr_t addr; + socklen_t addrlen; + int flags; + int ret; + const char *type; + + ret = unlink(data->path); + if (!(ret == 0 || ret == ENOENT)) { + signalling_error("Could not unlink the socket at %s: %s", + data->address, strerror(errno)); + return FALSE; + } + + addrlen = mrp_transport_resolve(NULL, data->address, + &addr, sizeof(addr), &type); + + if (addrlen > 0) { + + signalling_info("Address: %s, type: %s", data->address, type); + + flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_MODE_CUSTOM; + t = mrp_transport_create(data->ctx->ml, type, &evt, data, flags); + + if (t != NULL) { + + if (mrp_transport_bind(t, &addr, addrlen)) { + if (mrp_transport_listen(t, 4)) { + data->t = t; + return TRUE; + } + else + signalling_error("Failed to listen on server transport."); + } + else + signalling_error("Failed to bind to address %s.", data->address); + } + else + signalling_error("Failed to create listening socket transport."); + } + else + signalling_error("Invalid address '%s'.", data->address); + + mrp_transport_destroy(t); + + return FALSE; +} + +int type_init(void) +{ + if (!mrp_msg_register_type(&ep_register_descr) || + !mrp_msg_register_type(&ep_decision_descr) || + !mrp_msg_register_type(&ep_ack_descr)) { + mrp_log_error("Failed to register custom data type."); + return -1; + } + return 0; +} diff --git a/src/plugins/signalling/client.h b/src/plugins/signalling/client.h new file mode 100644 index 0000000..974c75a --- /dev/null +++ b/src/plugins/signalling/client.h @@ -0,0 +1,33 @@ +#ifndef __MURPHY_SIGNALLING_CLIENT_H__ +#define __MURPHY_SIGNALLING_CLIENT_H__ + +#include + +#include "plugin.h" +#include "transaction.h" + +typedef struct { + char *name; + uint32_t ndomains; + char **domains; + mrp_transport_t *t; /* associated transport */ + + /* TODO: function pointers for handling the internal/external EP + * case? */ + + bool registered; /* if the client is registered to server */ + data_t *u; +} client_t; + + +void deregister_and_free_client(client_t *c, data_t *ctx); +int send_policy_decision(data_t *ctx, client_t *c, transaction_t *tx); + +void free_client(client_t *c); + + +int socket_setup(data_t *data); +int type_init(void); + + +#endif /* __MURPHY_SIGNALLING_CLIENT_H__ */ diff --git a/src/plugins/signalling/plugin.h b/src/plugins/signalling/plugin.h new file mode 100644 index 0000000..ebadf8c --- /dev/null +++ b/src/plugins/signalling/plugin.h @@ -0,0 +1,27 @@ +#ifndef __MURPHY_SIGNALLING_PLUGIN_H__ +#define __MURPHY_SIGNALLING_PLUGIN_H__ + +#include + +#include +#include + +typedef struct { + const char *address; /* socket address */ + const char *path; /* socket file path */ + mrp_transport_t *t; /* transport we're listening on */ + int sock; /* main socket for new connections */ + mrp_io_watch_t *iow; /* main socket I/O watch */ + mrp_context_t *ctx; /* murphy context */ + mrp_htbl_t *txs; /* active transactions */ + mrp_htbl_t *clients; /* active clients */ + int n_clients; + uint32_t next_id; + mrp_sockaddr_t addr; + socklen_t addrlen; +} data_t; + + + + +#endif /* __MURPHY_SIGNALLING_PLUGIN_H__ */ diff --git a/src/plugins/signalling/signalling-protocol.h b/src/plugins/signalling/signalling-protocol.h new file mode 100644 index 0000000..aecad00 --- /dev/null +++ b/src/plugins/signalling/signalling-protocol.h @@ -0,0 +1,56 @@ +#ifndef __MURPHY_SIGNALLING_PROTOCOL_H__ +#define __MURPHY_SIGNALLING_PROTOCOL_H__ + +#include +#include + + +#define TAG_REGISTER 0x1 +#define TAG_UNREGISTER 0x2 /* implicit with unix domain sockets */ +#define TAG_POLICY_DECISION 0x3 +#define TAG_ACK 0x4 +#define TAG_ERROR 0x5 + +/* decision status */ + +#define EP_ACK 0x1 +#define EP_NACK 0x2 +#define EP_NOT_READY 0x3 + +typedef struct { + char *ep_name; /* EP name */ + uint32_t n_domains; /* number of domains */ + char **domains; /* array of domains */ +} ep_register_t; + +typedef struct { + uint32_t id; /* decision id */ + bool reply_required; /* if the EP must ACK/NACK the message */ + uint32_t n_rows; /* number of rows */ + char **rows; /* murphy-db database rows */ +} ep_decision_t; + +typedef struct { + uint32_t id; /* decision id */ + uint32_t success; /* ACK/NACK/... */ +} ep_ack_t; + + +MRP_DATA_DESCRIPTOR(ep_register_descr, TAG_REGISTER, ep_register_t, + MRP_DATA_MEMBER(ep_register_t, ep_name, MRP_MSG_FIELD_STRING), + MRP_DATA_MEMBER(ep_register_t, n_domains, MRP_MSG_FIELD_UINT32), + MRP_DATA_ARRAY_COUNT(ep_register_t, domains, n_domains, + MRP_MSG_FIELD_STRING)); + +MRP_DATA_DESCRIPTOR(ep_decision_descr, TAG_POLICY_DECISION, ep_decision_t, + MRP_DATA_MEMBER(ep_decision_t, id, MRP_MSG_FIELD_UINT32), + MRP_DATA_MEMBER(ep_decision_t, reply_required, MRP_MSG_FIELD_BOOL), + MRP_DATA_MEMBER(ep_decision_t, n_rows, MRP_MSG_FIELD_UINT32), + MRP_DATA_ARRAY_COUNT(ep_decision_t, rows, n_rows, + MRP_MSG_FIELD_STRING)); + +MRP_DATA_DESCRIPTOR(ep_ack_descr, TAG_ACK, ep_ack_t, + MRP_DATA_MEMBER(ep_ack_t, id, MRP_MSG_FIELD_UINT32), + MRP_DATA_MEMBER(ep_ack_t, success, MRP_MSG_FIELD_UINT32)); + +#endif /* __MURPHY_SIGNALLING_PROTOCOL_H__ */ diff --git a/src/plugins/signalling/signalling.h b/src/plugins/signalling/signalling.h new file mode 100644 index 0000000..256a0c9 --- /dev/null +++ b/src/plugins/signalling/signalling.h @@ -0,0 +1,61 @@ +#ifndef __MURPHY_SIGNALLING_H__ +#define __MURPHY_SIGNALLING_H__ + +#include +#include + +typedef enum { + MRP_TX_ERROR_UNDEFINED, + MRP_TX_ERROR_NOT_ANSWERED, + MRP_TX_ERROR_NACKED, + MRP_TX_ERROR_MAX +} mrp_tx_error_t; + +typedef void (*mrp_tx_success_cb) (uint32_t tx, void *data); +typedef void (*mrp_tx_error_cb) (uint32_t tx, mrp_tx_error_t err, void *data); + +#if 0 +/** Opens a new signal with given 'tx'. */ + +int mrp_tx_open_signal_with_id(uint32_t tx); +#endif + +/** Opens a new signal. Returns the assigned signal 'tx'. */ + +uint32_t mrp_tx_open_signal(); + +/** Adds a policy domain to the signal identified by 'tx'. */ + +int mrp_tx_add_domain(uint32_t tx, const char *domain); + +/** Adds a data row to the signal identified by 'tx'. */ + +int mrp_tx_add_data(uint32_t tx, const char *row); + +/** Adds a success callback to the signal identified by 'tx'. The callback + will be called if the signal is successfully ACKed by all enforcement + points registered to listen for the domains. If the success or error + callback isn't set, the enforcement points are not required to reply + to the signal.*/ + +void mrp_tx_add_success_cb(uint32_t tx, mrp_tx_success_cb cb, void *data); + +/** Adds an error callback to the signal identified by 'tx'. The callback + will be called if the signal is NACKed or not answered to by one or more + enforcement points registered to listen for the domains. If the success + or error callback isn't set, the enforcement points are not required to + reply to the signal.*/ + +void mrp_tx_add_error_cb(uint32_t tx, mrp_tx_error_cb cb, void *data); + +/** Closes the signal identified by 'tx' and sends it onward to the enforcement + points.*/ + +void mrp_tx_close_signal(uint32_t tx); + +/** Cancels a signal identified by 'tx'.*/ + +void mrp_tx_cancel_signal(uint32_t tx); + + +#endif /* __MURPHY_SIGNALLING_H__ */ diff --git a/src/plugins/signalling/transaction.c b/src/plugins/signalling/transaction.c new file mode 100644 index 0000000..79ef360 --- /dev/null +++ b/src/plugins/signalling/transaction.c @@ -0,0 +1,263 @@ +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "plugin.h" +#include "util.h" +#include "client.h" +#include "transaction.h" + +extern mrp_plugin_t *signalling_plugin; + + +void free_transaction(transaction_t *tx) +{ + uint i; + + for (i = 0; i < tx->data.n_rows; i++) { + mrp_free(tx->data.rows[i]); + } + mrp_free(tx->data.rows); + + for (i = 0; i < tx->data.n_domains; i++) { + mrp_free(tx->data.domains[i]); + } + mrp_free(tx->data.domains); + + mrp_free(tx->acked); + mrp_free(tx->nacked); + mrp_free(tx->not_answered); + + mrp_free(tx); +} + + +transaction_t *get_transaction(data_t *ctx, uint32_t id) +{ + return mrp_htbl_lookup(ctx->txs, u_to_p(id)); +} + + +void put_transaction(data_t *ctx, transaction_t *tx) +{ + mrp_htbl_insert(ctx->txs, u_to_p(tx->id), tx); +} + + +void remove_transaction(data_t *ctx, transaction_t *tx) +{ + mrp_htbl_remove(ctx->txs, u_to_p(tx->id), TRUE); +} + + +static uint32_t assign_id(data_t *ctx) +{ + return ctx->next_id++; +} + + +static bool domain_match(client_t *c, transaction_t *tx) +{ + uint i, j; + + for (i = 0; i < tx->data.n_domains; i++) { + for (j = 0; j < c->ndomains; j++) { + if (strcmp(tx->data.domains[i], c->domains[j]) == 0) { + return TRUE; + } + } + } + return FALSE; +} + + +static int is_interested(void *key, void *entry, void *user_data) +{ + transaction_t *tx = user_data; + client_t *c = entry; + + MRP_UNUSED(key); + + if (!tx || !c) + return MRP_HTBL_ITER_STOP; + + if (domain_match(c, tx)) { + tx->not_answered[tx->n_not_answered++] = c->name; + } + + return MRP_HTBL_ITER_MORE; +} + + +int fire_transaction(data_t *ctx, transaction_t *tx) +{ + /* TODO: make proper queuing */ + + uint i; + + for (i = 0; i < tx->n_not_answered; i++) { + client_t *c = mrp_htbl_lookup(ctx->clients, tx->not_answered[i]); + if (send_policy_decision(ctx, c, tx) < 0) { + signalling_error("Failed to send policy decision to %s", c->name); + } + } + + return 0; +} + + +void complete_transaction(data_t *ctx, transaction_t *tx) +{ + /* call the transaction callbacks */ + + if (tx->n_not_answered == 0 && tx->n_acked == tx->n_total) { + tx->data.success_cb(tx->id, tx->data.success_data); + } + else if (tx->n_nacked > 0 ){ + tx->data.error_cb(tx->id, MRP_TX_ERROR_NACKED, tx->data.error_data); + } + else { + tx->data.error_cb(tx->id, MRP_TX_ERROR_NOT_ANSWERED, tx->data.error_data); + } + + /* remove the transaction from the list */ + remove_transaction(ctx, tx); +} + + +static uint32_t mrp_tx_open_signal_with_id(uint32_t id) +{ + data_t *ctx = signalling_plugin->data; + transaction_t *tx; + + tx = mrp_allocz(sizeof(transaction_t)); + tx->id = id; + tx->data.row_array_size = 32; + tx->data.rows = mrp_allocz(tx->data.row_array_size); + + tx->data.domain_array_size = 8; + tx->data.rows = mrp_allocz(tx->data.domain_array_size); + + put_transaction(ctx, tx); + + return id; +} + + +uint32_t mrp_tx_open_signal() +{ + data_t *ctx = signalling_plugin->data; + + return mrp_tx_open_signal_with_id(assign_id(ctx)); +} + + +int mrp_tx_add_domain(uint32_t id, const char *domain) +{ + data_t *ctx = signalling_plugin->data; + transaction_t *tx; + + tx = get_transaction(ctx, id); + + if (!tx) + return -1; + + tx->data.domains[tx->data.n_domains++] = mrp_strdup(domain); + + if (tx->data.n_domains == tx->data.domain_array_size) { + tx->data.n_domains *= 2; + tx->data.domains = mrp_realloc(tx->data.domains, tx->data.n_domains); + } + + return 0; +} + + +int mrp_tx_add_data(uint32_t id, const char *row) +{ + data_t *ctx = signalling_plugin->data; + transaction_t *tx; + + tx = get_transaction(ctx, id); + + if (!tx) + return -1; + + tx->data.rows[tx->data.n_rows++] = mrp_strdup(row); + + if (tx->data.n_rows == tx->data.row_array_size) { + tx->data.n_rows *= 2; + tx->data.rows = mrp_realloc(tx->data.rows, tx->data.n_rows); + } + + return 0; +} + + +void mrp_tx_add_success_cb(uint32_t id, mrp_tx_success_cb cb, void *data) +{ + data_t *ctx = signalling_plugin->data; + transaction_t *tx; + + tx = get_transaction(ctx, id); + + if (tx) { + tx->data.success_cb = cb; + tx->data.success_data = data; + } +} + + +void mrp_tx_add_error_cb(uint32_t id, mrp_tx_error_cb cb, void *data) +{ + data_t *ctx = signalling_plugin->data; + transaction_t *tx; + + tx = get_transaction(ctx, id); + + if (tx) { + tx->data.error_cb = cb; + tx->data.error_data = data; + } +} + + +void mrp_tx_close_signal(uint32_t id) +{ + data_t *ctx = signalling_plugin->data; + transaction_t *tx; + + tx = get_transaction(ctx, id); + + if (tx) { + /* allocate the client arrays */ + + tx->acked = mrp_allocz_array(char *, ctx->n_clients); + tx->nacked = mrp_allocz_array(char *, ctx->n_clients); + tx->not_answered = mrp_allocz_array(char *, ctx->n_clients); + + mrp_htbl_foreach(ctx->clients, is_interested, tx); + + tx->n_total = tx->n_not_answered; + fire_transaction(ctx, tx); + } +} + + +void mrp_tx_cancel_signal(uint32_t id) +{ + data_t *ctx = signalling_plugin->data; + transaction_t *tx; + + tx = get_transaction(ctx, id); + + if (tx) { + remove_transaction(ctx, tx); + } +} diff --git a/src/plugins/signalling/transaction.h b/src/plugins/signalling/transaction.h new file mode 100644 index 0000000..db1c5ff --- /dev/null +++ b/src/plugins/signalling/transaction.h @@ -0,0 +1,53 @@ +#ifndef __MURPHY_SIGNALLING_TRANSACTION_H__ +#define __MURPHY_SIGNALLING_TRANSACTION_H__ + +#include + +#include +#include + +#include "plugin.h" + +typedef struct { + char **domains; + uint32_t n_domains; + uint32_t domain_array_size; + char **rows; + uint32_t n_rows; + uint32_t row_array_size; + + mrp_tx_success_cb success_cb; + void *success_data; + mrp_tx_error_cb error_cb; + void *error_data; +} transaction_data_t; + +/* an ongoing transaction */ +typedef struct { + uint32_t id; /* The real ID. */ + uint32_t caller_id; /* Id assigned by caller. */ + char **acked; + char **nacked; + char **not_answered; + + uint n_acked; + uint n_nacked; + uint n_not_answered; + + uint n_total; + transaction_data_t data; +} transaction_t; + + +int fire_transaction(data_t *ctx, transaction_t *tx); + +void complete_transaction(data_t *ctx, transaction_t *tx); + +void free_transaction(transaction_t *tx); +transaction_t *get_transaction(data_t *ctx, uint32_t id); +void put_transaction(data_t *ctx, transaction_t *tx); +void remove_transaction(data_t *ctx, transaction_t *tx); + + + +#endif /* __MURPHY_SIGNALLING_TRANSACTION_H__ */ diff --git a/src/plugins/signalling/util.c b/src/plugins/signalling/util.c new file mode 100644 index 0000000..db0b248 --- /dev/null +++ b/src/plugins/signalling/util.c @@ -0,0 +1,45 @@ +#include + +#include "util.h" + +void *u_to_p(uint32_t u) +{ +#ifdef __SIZEOF_POINTER__ +#if __SIZEOF_POINTER__ == 8 + uint64_t o = u; +#else + uint32_t o = u; +#endif +#else + uint32_t o = o; +#endif + return (void *) o; +} + +uint32_t p_to_u(const void *p) +{ +#ifdef __SIZEOF_POINTER__ +#if __SIZEOF_POINTER__ == 8 + uint32_t o = 0; + uint64_t big = (uint64_t) p; + o = big & 0xffffffff; +#else + uint32_t o = (uint32_t) p; +#endif +#else + uint32_t o = p; +#endif + return o; +} + + +int int_comp(const void *key1, const void *key2) +{ + return key1 != key2; +} + + +uint32_t int_hash(const void *key) +{ + return p_to_u(key); +} diff --git a/src/plugins/signalling/util.h b/src/plugins/signalling/util.h new file mode 100644 index 0000000..eea1d75 --- /dev/null +++ b/src/plugins/signalling/util.h @@ -0,0 +1,15 @@ +#ifndef __MURPHY_SIGNALLING_UTIL_H__ +#define __MURPHY_SIGNALLING_UTIL_H__ + +#include + +#define signalling_info(fmt, args...) mrp_log_info("signalling: "fmt , ## args) +#define signalling_warn(fmt, args...) mrp_log_warning("signalling: "fmt , ## args) +#define signalling_error(fmt, args...) mrp_log_error("signalling: "fmt , ## args) + +void *u_to_p(uint32_t u); +uint32_t p_to_u(const void *p); +int int_comp(const void *key1, const void *key2); +uint32_t int_hash(const void *key); + +#endif /* __MURPHY_SIGNALLING_UTIL_H__ */ diff --git a/src/plugins/tests/Makefile.am b/src/plugins/tests/Makefile.am new file mode 100644 index 0000000..791081e --- /dev/null +++ b/src/plugins/tests/Makefile.am @@ -0,0 +1,8 @@ +AM_CFLAGS = $(WARNING_CFLAGS) -I$(top_builddir) + +noinst_PROGRAMS = signalling-test + +# signalling test +signalling_test_SOURCES = signalling-client.c +signalling_test_CFLAGS = $(AM_CFLAGS) +signalling_test_LDADD = ../../libmurphy-common.la diff --git a/src/plugins/tests/signalling-client.c b/src/plugins/tests/signalling-client.c new file mode 100644 index 0000000..ffa108c --- /dev/null +++ b/src/plugins/tests/signalling-client.c @@ -0,0 +1,158 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +typedef struct { + mrp_transport_t *t; + mrp_mainloop_t *ml; +} client_t; + + +static int send_registration(client_t *c) +{ + char *name = "test ep"; + char *domains[] = { "domain1", "domain2" }; + ep_register_t msg; + int ret; + + msg.ep_name = name; + msg.domains = domains; + msg.n_domains = 2; + + ret = mrp_transport_senddata(c->t, &msg, TAG_REGISTER); + + if (!ret) { + printf("failed to send register message\n"); + } + + return ret; +} + + +static int send_reply(client_t *c, ep_decision_t *msg, uint32_t success) +{ + ep_ack_t reply; + int ret; + + reply.id = msg->id; + reply.success = success; + + ret = mrp_transport_senddata(c->t, &reply, TAG_ACK); + + if (!ret) { + printf("failed to send reply\n"); + } + + return ret; +} + + +static void handle_decision(client_t *c, ep_decision_t *msg) +{ + printf("Handle decision\n"); + send_reply(c, msg, EP_ACK); + + return; +} + + +static void closed_evt(mrp_transport_t *t, int error, void *user_data) +{ + (void) t; + (void) error; + (void) user_data; + printf("Received closed event\n"); +} + + +static void recvfrom_evt(mrp_transport_t *t, void *data, uint16_t tag, + mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data) +{ + client_t *c = user_data; + + MRP_UNUSED(addr); + MRP_UNUSED(addrlen); + MRP_UNUSED(user_data); + MRP_UNUSED(t); + + printf("Received message (0x%02x)\n", tag); + + switch (tag) { + case TAG_POLICY_DECISION: + handle_decision(c, data); + break; + case TAG_ERROR: + printf("Server sends an error message!\n"); + break; + default: + /* no other messages supported ATM */ + break; + } + +} + + +static void recv_evt(mrp_transport_t *t, void *data, uint16_t tag, void *user_data) +{ + recvfrom_evt(t, data, tag, NULL, 0, user_data); +} + + +int main() +{ + socklen_t alen; + mrp_sockaddr_t addr; + int ret, flags; + + static client_t client; + static mrp_transport_evt_t evt; /* static members are initialized to zero */ + + evt.closed = closed_evt; + evt.recvdatafrom = recvfrom_evt; + evt.recvdata = recv_evt; + + if (!mrp_msg_register_type(&ep_register_descr) || + !mrp_msg_register_type(&ep_decision_descr) || + !mrp_msg_register_type(&ep_ack_descr)) { + printf("Registering data type failed!\n"); + exit(1); + } + + client.ml = mrp_mainloop_create(); + + flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_MODE_CUSTOM; + + client.t = mrp_transport_create(client.ml, "unxs", &evt, &client, flags); + if (client.t == NULL) { + printf("Error creating a new transport!\n"); + exit(1); + } + + alen = mrp_transport_resolve(NULL, "unxs:/tmp/murphy/signalling", &addr, sizeof(addr), NULL); + if (alen <= 0) { + printf("Error resolving address! Maybe the host is not running?\n"); + exit(1); + } + + + ret = mrp_transport_connect(client.t, &addr, alen); + if (ret == 0) { + printf("Connect failed!\n"); + exit(1); + } + + send_registration(&client); + + mrp_mainloop_run(client.ml); + + return 0; +} -- 2.7.4