signalling: add an initial signalling plugin.
authorIsmo Puustinen <ismo.puustinen@intel.com>
Fri, 29 Jun 2012 11:21:51 +0000 (14:21 +0300)
committerKrisztian Litkey <krisztian.litkey@intel.com>
Fri, 26 Oct 2012 16:03:49 +0000 (19:03 +0300)
14 files changed:
configure.ac
src/Makefile.am
src/plugins/plugin-signalling.c [new file with mode: 0644]
src/plugins/signalling/client.c [new file with mode: 0644]
src/plugins/signalling/client.h [new file with mode: 0644]
src/plugins/signalling/plugin.h [new file with mode: 0644]
src/plugins/signalling/signalling-protocol.h [new file with mode: 0644]
src/plugins/signalling/signalling.h [new file with mode: 0644]
src/plugins/signalling/transaction.c [new file with mode: 0644]
src/plugins/signalling/transaction.h [new file with mode: 0644]
src/plugins/signalling/util.c [new file with mode: 0644]
src/plugins/signalling/util.h [new file with mode: 0644]
src/plugins/tests/Makefile.am [new file with mode: 0644]
src/plugins/tests/signalling-client.c [new file with mode: 0644]

index b3a27d9..218c803 100644 (file)
@@ -292,11 +292,13 @@ AM_CONDITIONAL(DISABLED_PLUGIN_TEST,    [check_if_disabled test])
 AM_CONDITIONAL(DISABLED_PLUGIN_DBUS,    [check_if_disabled dbus])
 AM_CONDITIONAL(DISABLED_PLUGIN_GLIB,    [check_if_disabled glib])
 AM_CONDITIONAL(DISABLED_PLUGIN_CONSOLE, [check_if_disabled console])
+AM_CONDITIONAL(DISABLED_PLUGIN_SIGNALLING, [check_if_disabled signalling])
 
 AM_CONDITIONAL(BUILTIN_PLUGIN_TEST,     [check_if_internal test])
 AM_CONDITIONAL(BUILTIN_PLUGIN_DBUS,     [check_if_internal dbus])
 AM_CONDITIONAL(BUILTIN_PLUGIN_GLIB,     [check_if_internal glib])
 AM_CONDITIONAL(BUILTIN_PLUGIN_CONSOLE,  [check_if_internal console])
+AM_CONDITIONAL(BUILTIN_PLUGIN_SIGNALLING, [check_if_internal signalling])
 
 # Check for Check (unit test framework).
 PKG_CHECK_MODULES(CHECK, 
@@ -380,25 +382,26 @@ AC_CONFIG_FILES([build-aux/shave
                 build-aux/shave-libtool
                 Makefile
                 src/Makefile
+                src/common/tests/Makefile
+                src/core/tests/Makefile
+                src/daemon/tests/Makefile
+                src/plugins/tests/Makefile
                 src/common/murphy-common.pc
                 src/common/murphy-dbus.pc
                 src/common/murphy-pulse.pc
-                src/common/tests/Makefile
                 src/core/murphy-core.pc
-                src/core/tests/Makefile
-                 src/murphy-db/Makefile
-                 src/murphy-db/mdb/Makefile
-                 src/murphy-db/mqi/Makefile
-                 src/murphy-db/mql/Makefile
-                 src/murphy-db/include/Makefile
-                 src/murphy-db/tests/Makefile
-                src/daemon/tests/Makefile
+         src/murphy-db/Makefile
+         src/murphy-db/mdb/Makefile
+         src/murphy-db/mqi/Makefile
+         src/murphy-db/mql/Makefile
+         src/murphy-db/include/Makefile
+         src/murphy-db/tests/Makefile
                 src/resolver/murphy-resolver.pc
                 src/resolver/tests/Makefile
                 doc/Makefile
-                 doc/plugin-developer-guide/Makefile
-                 doc/plugin-developer-guide/db/Makefile
-                 doc/plugin-developer-guide/doxml/Makefile
+         doc/plugin-developer-guide/Makefile
+         doc/plugin-developer-guide/db/Makefile
+         doc/plugin-developer-guide/doxml/Makefile
                 ])
 AC_OUTPUT
 
index ce030c6..719dba0 100644 (file)
@@ -1,5 +1,5 @@
 SUBDIRS         = murphy-db . \
-                 common/tests core/tests daemon/tests resolver/tests
+                 common/tests core/tests daemon/tests resolver/tests plugins/tests
 AM_CFLAGS       = $(WARNING_CFLAGS) -I$(top_builddir) -DLIBDIR=\"@LIBDIR@\"
 MURPHY_CFLAGS   = 
 pkgconfigdir    = ${libdir}/pkgconfig
@@ -460,6 +460,43 @@ clean-func-infos::
        -rm plugin-console-func-info.c
 endif
 
+# signalling plugin
+SIGNALLING_PLUGIN_SOURCES = plugins/plugin-signalling.c \
+                            plugins/signalling/transaction.c \
+                            plugins/signalling/util.c \
+                            plugins/signalling/client.c \
+                            plugins/signalling/plugin.h \
+                            plugins/signalling/transaction.h \
+                            plugins/signalling/util.h \
+                            plugins/signalling/client.h
+SIGNALLING_PLUGIN_HEADERS = plugins/signalling/signalling.h
+SIGNALLING_PLUGIN_CFLAGS  =
+SIGNALLING_PLUGIN_LIBS    =
+SIGNALLING_PLUGINdir      = $(includedir)/murphy/plugins/signalling
+
+
+if BUILTIN_PLUGIN_SIGNALLING
+BUILTIN_PLUGINS += $(SIGNALLING_PLUGIN_SOURCES)
+BUILTIN_CFLAGS  += $(SIGNALLING_PLUGIN_CFLAGS)
+BUILTIN_LIBS    += $(SIGNALLING_PLUGIN_LIBS)
+else
+plugin_signalling_la_SOURCES = $(SIGNALLING_PLUGIN_SOURCES)
+plugin_signalling_la_HEADERS = $(SIGNALLING_PLUGIN_HEADERS)
+plugin_signalling_la_CFLAGS  = $(SIGNALLING_PLUGIN_CFLAGS) \
+                $(MURPHY_CFLAGS) $(AM_CFLAGS)
+plugin_signalling_la_LDFLAGS = -module -Wl,-version-script=linker-script.plugin-signalling \
+                               -version-info @MURPHY_VERSION_INFO@
+plugin_signalling_la_LIBADD  = $(SIGNALLING_PLUGIN_LIBS)
+plugin_signalling_ladir      = $(SIGNALLING_PLUGINdir)
+
+plugin_LTLIBRARIES    += plugin-signalling.la
+endif
+
+# linker script generation
+linker-script.plugin-signalling: $(plugin_signalling_la_HEADERS)
+       $(QUIET_GEN)$(top_builddir)/build-aux/gen-linker-script -q -o $@ $^
+
+plugin_signalling_la_DEPENDENCIES = linker-script.plugin-signalling
 
 ###################################
 # murphy daemon
diff --git a/src/plugins/plugin-signalling.c b/src/plugins/plugin-signalling.c
new file mode 100644 (file)
index 0000000..2fa2246
--- /dev/null
@@ -0,0 +1,122 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include <murphy/common.h>
+#include <murphy/core.h>
+
+#include "signalling/plugin.h"
+#include "signalling/transaction.h"
+#include "signalling/client.h"
+#include "signalling/util.h"
+
+mrp_plugin_t *signalling_plugin;
+
+enum {
+    ARG_ADDRESS /* signalling socket address, 'address:port' */
+};
+
+
+static void htbl_free_client(void *key, void *object)
+{
+    MRP_UNUSED(key);
+    free_client(object);
+}
+
+
+static void htbl_free_transaction(void *key, void *object)
+{
+    MRP_UNUSED(key);
+    free_transaction(object);
+}
+
+
+static int signalling_init(mrp_plugin_t *plugin)
+{
+    data_t *data;
+    mrp_htbl_config_t client_conf, tx_conf;
+
+    signalling_info("> init()");
+
+    if ((data = mrp_allocz(sizeof(*data))) == NULL)
+        return FALSE;
+
+    data->ctx     = plugin->ctx;
+    data->address = plugin->args[ARG_ADDRESS].str;
+
+    type_init();
+
+    client_conf.comp = mrp_string_comp;
+    client_conf.hash = mrp_string_hash;
+    client_conf.free = htbl_free_client;
+    client_conf.nbucket = 0;
+    client_conf.nentry = 10;
+
+    data->clients = mrp_htbl_create(&client_conf);
+
+    tx_conf.comp = int_comp;
+    tx_conf.hash = int_hash;
+    tx_conf.free = htbl_free_transaction;
+    tx_conf.nbucket = 0;
+    tx_conf.nentry = 5;
+
+    data->txs = mrp_htbl_create(&tx_conf);
+
+    /* we only support unix domain sockets for the time being */
+
+    if (!strncmp(data->address, "unxs:", 5)) {
+        data->path = data->address + 5;
+        if (socket_setup(data)) {
+            plugin->data = data;
+            signalling_info("set up at address '%s'.", data->address);
+
+            signalling_plugin = plugin;
+
+            return TRUE;
+        }
+    }
+
+    mrp_free(data);
+
+    signalling_error("failed to set up signalling at address '%s'.",
+            plugin->args[ARG_ADDRESS].str);
+
+    return FALSE;
+}
+
+
+static void signalling_exit(mrp_plugin_t *plugin)
+{
+    data_t *ctx = plugin->data;
+
+    signalling_info("cleaning up instance '%s'...", plugin->instance);
+
+    /* FIXME: call error callbacks of all active transactions? */
+
+    mrp_htbl_destroy(ctx->clients, TRUE);
+    mrp_htbl_destroy(ctx->txs, TRUE);
+}
+
+
+#define SIGNALLING_DESCRIPTION "A decision signalling plugin for Murphy."
+#define SIGNALLING_HELP \
+    "The signalling plugin provides one-to-many communication from Murphy\n"  \
+    "to enforcement points. The enforcement points are supposed to use\n"     \
+    "libsignalling to initialize connection to Murphy and receive events\n"   \
+    "from it."
+
+#define SIGNALLING_VERSION MRP_VERSION_INT(0, 0, 1)
+#define SIGNALLING_AUTHORS "Ismo Puustinen <ismo.puustinen@intel.com>"
+
+
+static mrp_plugin_arg_t signalling_args[] = {
+    MRP_PLUGIN_ARGIDX(ARG_ADDRESS, STRING, "address", "unxs:/tmp/murphy/signalling"),
+};
+
+MURPHY_REGISTER_CORE_PLUGIN("signalling",
+        SIGNALLING_VERSION, SIGNALLING_DESCRIPTION,
+        SIGNALLING_AUTHORS, SIGNALLING_HELP, MRP_SINGLETON,
+        signalling_init, signalling_exit, signalling_args, NULL);
diff --git a/src/plugins/signalling/client.c b/src/plugins/signalling/client.c
new file mode 100644 (file)
index 0000000..132a331
--- /dev/null
@@ -0,0 +1,333 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <netdb.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include <murphy/common.h>
+#include <murphy/core.h>
+#include <murphy/plugins/signalling/signalling-protocol.h>
+
+#include "client.h"
+#include "transaction.h"
+#include "util.h"
+
+
+void free_client(client_t *c)
+{
+    uint i;
+
+    mrp_free(c->name);
+
+    for (i = 0; i < c->ndomains; i++) {
+        mrp_free(c->domains[i]);
+    }
+    mrp_free(c->domains);
+
+    mrp_free(c);
+}
+
+
+static void remove_client_from_transactions(client_t *c, data_t *ctx) {
+    /* TODO: Go through all transactions. Remove the client from the
+       not-answered lists. Re-allocate the client name to the other lists? */
+
+    MRP_UNUSED(c);
+    MRP_UNUSED(ctx);
+
+    return;
+}
+
+
+void deregister_and_free_client(client_t *c, data_t *ctx)
+{
+    remove_client_from_transactions(c, ctx);
+    mrp_htbl_remove(ctx->clients, c->name, 0);
+    free_client(c);
+}
+
+
+int send_policy_decision(data_t *ctx, client_t *c, transaction_t *tx)
+{
+    ep_decision_t msg;
+
+    MRP_UNUSED(ctx);
+
+    msg.id = tx->id;
+    msg.n_rows = tx->data.n_rows;
+    msg.rows = tx->data.rows;
+
+    if (tx->data.success_cb || tx->data.error_cb) {
+        signalling_info("Reply required for transaction %u", tx->id);
+        msg.reply_required = TRUE;
+    }
+    else
+        msg.reply_required = FALSE;
+
+    return mrp_transport_senddata(c->t, &msg, TAG_POLICY_DECISION);
+}
+
+
+static int handle_ack(client_t *c, data_t *ctx, ep_ack_t *data)
+{
+    signalling_info("register message");
+
+    transaction_t *tx = get_transaction(ctx, data->id);
+
+    if (!tx) {
+        signalling_warn("no transaction with %d found, maybe already done",
+            data->id);
+        return 0;
+    }
+
+    switch(data->success) {
+        case EP_ACK:
+        {
+            uint i, found = 0;
+            /* go through the not_answered array */
+            for (i = 0; i < tx->n_total; i++) {
+                if (strcmp(c->name, tx->not_answered[i]) == 0) {
+                    found = 1;
+                    tx->acked[tx->n_acked++] = tx->not_answered[i];
+                    tx->n_not_answered--;
+                }
+            }
+            if (!found) {
+                signalling_warn("spurious ACK from %s, ignoring", c->name);
+                return 0;
+            }
+
+            break;
+        }
+        case EP_NACK:
+        case EP_NOT_READY:
+        {
+            uint i, found = 0;
+            /* go through the not_answered array */
+            for (i = 0; i < tx->n_total; i++) {
+                if (strcmp(c->name, tx->not_answered[i]) == 0) {
+                    found = 1;
+                    tx->nacked[tx->n_nacked++] = tx->not_answered[i];
+                    tx->n_not_answered--;
+
+                    /* FIXME: handle error here or wait for all EPs to answer? */
+                }
+            }
+            if (!found) {
+                signalling_error("spurious NACK from %s", c->name);
+                return 0;
+            }
+
+            break;
+        }
+        default:
+            signalling_error("unhandled ACK status!");
+            return -1;
+    }
+
+    if (tx->n_not_answered == 0) {
+        complete_transaction(ctx, tx);
+    }
+
+    return 0;
+}
+
+
+static int handle_register(client_t *c, data_t *ctx, ep_register_t *data)
+{
+    uint i;
+
+    signalling_info("register message");
+
+    signalling_info("ep name: %s", data->ep_name);
+    signalling_info("number of domains: %d", data->n_domains);
+
+    if (strcmp(data->ep_name, "") == 0) {
+        signalling_error("EP with an empty name");
+
+        /* TODO: send an error message back */
+        return -1;
+    }
+
+    if (mrp_htbl_lookup(ctx->clients, data->ep_name)) {
+        /* there already was a client of similar name */
+        signalling_error("EP '%s' already exists in db", data->ep_name);
+
+        /* TODO: send an error message back */
+        return -1;
+    }
+
+    c->name = mrp_strdup(data->ep_name);
+    c->ndomains = data->n_domains;
+    c->domains = mrp_alloc_array(char *, data->n_domains);
+
+    for (i = 0; i < data->n_domains; i++) {
+        c->domains[i] = mrp_strdup(data->domains[i]);
+        signalling_info("domain: %s", data->domains[i]);
+    }
+
+    c->registered = TRUE;
+    mrp_htbl_insert(ctx->clients, c->name, c);
+    ctx->n_clients++;
+
+    return 0;
+}
+
+
+static void recvfrom_evt(mrp_transport_t *t, void *data, uint16_t tag,
+             mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data)
+{
+    client_t *c = (client_t *)user_data;
+    int ret = -1;
+
+    MRP_UNUSED(t);
+    MRP_UNUSED(addr);
+    MRP_UNUSED(addrlen);
+
+    signalling_info("Received message (%d)", tag);
+
+    switch(tag) {
+
+        case TAG_REGISTER:
+            ret = handle_register(c, c->u, data);
+            break;
+        case TAG_ACK:
+            ret = handle_ack(c, c->u, data);
+            break;
+        case TAG_UNREGISTER:
+            break;
+        default:
+            signalling_warn("Unhandled message type");
+            ret = 0;
+            break;
+    }
+
+    if (ret < 0) {
+        signalling_error("Malformed message");
+    }
+}
+
+
+static void recv_evt(mrp_transport_t *t, void *data, uint16_t tag, void *user_data)
+{
+    recvfrom_evt(t, data, tag, NULL, 0, user_data);
+}
+
+
+static void closed_evt(mrp_transport_t *t, int error, void *user_data)
+{
+    client_t *c = (client_t *)user_data;
+    data_t *ctx = c->u;
+
+    if (error)
+        mrp_log_error("Connection closed with error %d (%s).", error,
+                strerror(error));
+    else {
+        mrp_log_info("Peer has closed the connection.");
+
+        mrp_transport_disconnect(t);
+        mrp_transport_destroy(t);
+        c->t = NULL;
+
+        if (c->registered)
+            deregister_and_free_client(c, ctx);
+    }
+}
+
+
+static void connection_evt(mrp_transport_t *lt, void *user_data)
+{
+    data_t *ctx = (data_t *) user_data;
+    int flags;
+    client_t *c;
+
+    signalling_info("Connection from peer.");
+
+    if ((c = mrp_allocz(sizeof(*c))) != NULL) {
+        flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
+        c->t  = mrp_transport_accept(lt, c, flags);
+        c->u = ctx;
+        c->registered = FALSE;
+
+        if (c->t != NULL) {
+            signalling_info("Connection accepted.");
+            /* TODO: maybe remove the client if no registration in some time */
+            return;
+        }
+
+        mrp_transport_destroy(c->t);
+        mrp_free(c);
+    }
+}
+
+
+int socket_setup(data_t *data)
+{
+    static mrp_transport_evt_t evt; /* static members are initialized to zero */
+
+    evt.connection = connection_evt;
+    evt.closed = closed_evt;
+    evt.recvdatafrom = recvfrom_evt;
+    evt.recvdata = recv_evt;
+
+    mrp_transport_t *t = NULL;
+    mrp_sockaddr_t addr;
+    socklen_t addrlen;
+    int flags;
+    int ret;
+    const char *type;
+
+    ret = unlink(data->path);
+    if (!(ret == 0 || ret == ENOENT)) {
+        signalling_error("Could not unlink the socket at %s: %s",
+            data->address, strerror(errno));
+        return FALSE;
+    }
+
+    addrlen = mrp_transport_resolve(NULL, data->address,
+            &addr, sizeof(addr), &type);
+
+    if (addrlen > 0) {
+
+        signalling_info("Address: %s, type: %s", data->address, type);
+
+        flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_MODE_CUSTOM;
+        t = mrp_transport_create(data->ctx->ml, type, &evt, data, flags);
+
+        if (t != NULL) {
+
+            if (mrp_transport_bind(t, &addr, addrlen)) {
+                if (mrp_transport_listen(t, 4)) {
+                    data->t = t;
+                    return TRUE;
+                }
+                else
+                    signalling_error("Failed to listen on server transport.");
+            }
+            else
+                signalling_error("Failed to bind to address %s.", data->address);
+        }
+        else
+            signalling_error("Failed to create listening socket transport.");
+    }
+    else
+        signalling_error("Invalid address '%s'.", data->address);
+
+    mrp_transport_destroy(t);
+
+    return FALSE;
+}
+
+int type_init(void)
+{
+    if (!mrp_msg_register_type(&ep_register_descr) ||
+        !mrp_msg_register_type(&ep_decision_descr) ||
+        !mrp_msg_register_type(&ep_ack_descr)) {
+        mrp_log_error("Failed to register custom data type.");
+        return -1;
+    }
+    return 0;
+}
diff --git a/src/plugins/signalling/client.h b/src/plugins/signalling/client.h
new file mode 100644 (file)
index 0000000..974c75a
--- /dev/null
@@ -0,0 +1,33 @@
+#ifndef __MURPHY_SIGNALLING_CLIENT_H__
+#define __MURPHY_SIGNALLING_CLIENT_H__
+
+#include <stdint.h>
+
+#include "plugin.h"
+#include "transaction.h"
+
+typedef struct {
+    char *name;
+    uint32_t ndomains;
+    char **domains;
+    mrp_transport_t *t;   /* associated transport */
+
+    /* TODO: function pointers for handling the internal/external EP
+     * case? */
+
+    bool registered;      /* if the client is registered to server */
+    data_t *u;
+} client_t;
+
+
+void deregister_and_free_client(client_t *c, data_t *ctx);
+int send_policy_decision(data_t *ctx, client_t *c, transaction_t *tx);
+
+void free_client(client_t *c);
+
+
+int socket_setup(data_t *data);
+int type_init(void);
+
+
+#endif /* __MURPHY_SIGNALLING_CLIENT_H__ */
diff --git a/src/plugins/signalling/plugin.h b/src/plugins/signalling/plugin.h
new file mode 100644 (file)
index 0000000..ebadf8c
--- /dev/null
@@ -0,0 +1,27 @@
+#ifndef __MURPHY_SIGNALLING_PLUGIN_H__
+#define __MURPHY_SIGNALLING_PLUGIN_H__
+
+#include <stdint.h>
+
+#include <murphy/common.h>
+#include <murphy/core.h>
+
+typedef struct {
+    const char *address;     /* socket address */
+    const char *path;        /* socket file path */
+    mrp_transport_t *t;      /* transport we're listening on */
+    int sock;                /* main socket for new connections */
+    mrp_io_watch_t *iow;     /* main socket I/O watch */
+    mrp_context_t *ctx;      /* murphy context */
+    mrp_htbl_t *txs;         /* active transactions */
+    mrp_htbl_t *clients;     /* active clients */
+    int n_clients;
+    uint32_t next_id;
+    mrp_sockaddr_t addr;
+    socklen_t addrlen;
+} data_t;
+
+
+
+
+#endif /* __MURPHY_SIGNALLING_PLUGIN_H__ */
diff --git a/src/plugins/signalling/signalling-protocol.h b/src/plugins/signalling/signalling-protocol.h
new file mode 100644 (file)
index 0000000..aecad00
--- /dev/null
@@ -0,0 +1,56 @@
+#ifndef __MURPHY_SIGNALLING_PROTOCOL_H__
+#define __MURPHY_SIGNALLING_PROTOCOL_H__
+
+#include <stdint.h>
+#include <murphy/common.h>
+
+
+#define TAG_REGISTER         0x1
+#define TAG_UNREGISTER       0x2 /* implicit with unix domain sockets */
+#define TAG_POLICY_DECISION  0x3
+#define TAG_ACK              0x4
+#define TAG_ERROR            0x5
+
+/* decision status */
+
+#define EP_ACK               0x1
+#define EP_NACK              0x2
+#define EP_NOT_READY         0x3
+
+typedef struct {
+    char *ep_name;       /* EP name */
+    uint32_t n_domains;   /* number of domains */
+    char **domains;      /* array of domains */
+} ep_register_t;
+
+typedef struct {
+    uint32_t id;         /* decision id */
+    bool reply_required; /* if the EP must ACK/NACK the message */
+    uint32_t n_rows;      /* number of rows */
+    char **rows;         /* murphy-db database rows */
+} ep_decision_t;
+
+typedef struct {
+    uint32_t id;         /* decision id */
+    uint32_t success;    /* ACK/NACK/... */
+} ep_ack_t;
+
+
+MRP_DATA_DESCRIPTOR(ep_register_descr, TAG_REGISTER, ep_register_t,
+        MRP_DATA_MEMBER(ep_register_t, ep_name, MRP_MSG_FIELD_STRING),
+        MRP_DATA_MEMBER(ep_register_t, n_domains, MRP_MSG_FIELD_UINT32),
+        MRP_DATA_ARRAY_COUNT(ep_register_t, domains, n_domains,
+            MRP_MSG_FIELD_STRING));
+
+MRP_DATA_DESCRIPTOR(ep_decision_descr, TAG_POLICY_DECISION, ep_decision_t,
+        MRP_DATA_MEMBER(ep_decision_t, id, MRP_MSG_FIELD_UINT32),
+        MRP_DATA_MEMBER(ep_decision_t, reply_required, MRP_MSG_FIELD_BOOL),
+        MRP_DATA_MEMBER(ep_decision_t, n_rows, MRP_MSG_FIELD_UINT32),
+        MRP_DATA_ARRAY_COUNT(ep_decision_t, rows, n_rows,
+            MRP_MSG_FIELD_STRING));
+
+MRP_DATA_DESCRIPTOR(ep_ack_descr, TAG_ACK, ep_ack_t,
+        MRP_DATA_MEMBER(ep_ack_t, id, MRP_MSG_FIELD_UINT32),
+        MRP_DATA_MEMBER(ep_ack_t, success, MRP_MSG_FIELD_UINT32));
+
+#endif /* __MURPHY_SIGNALLING_PROTOCOL_H__ */
diff --git a/src/plugins/signalling/signalling.h b/src/plugins/signalling/signalling.h
new file mode 100644 (file)
index 0000000..256a0c9
--- /dev/null
@@ -0,0 +1,61 @@
+#ifndef __MURPHY_SIGNALLING_H__
+#define __MURPHY_SIGNALLING_H__
+
+#include <stdint.h>
+#include <murphy/common.h>
+
+typedef enum {
+    MRP_TX_ERROR_UNDEFINED,
+    MRP_TX_ERROR_NOT_ANSWERED,
+    MRP_TX_ERROR_NACKED,
+    MRP_TX_ERROR_MAX
+} mrp_tx_error_t;
+
+typedef void (*mrp_tx_success_cb) (uint32_t tx, void *data);
+typedef void (*mrp_tx_error_cb) (uint32_t tx, mrp_tx_error_t err, void *data);
+
+#if 0
+/** Opens a new signal with given 'tx'. */
+
+int mrp_tx_open_signal_with_id(uint32_t tx);
+#endif
+
+/** Opens a new signal. Returns the assigned signal 'tx'. */
+
+uint32_t mrp_tx_open_signal();
+
+/** Adds a policy domain to the signal identified by 'tx'. */
+
+int mrp_tx_add_domain(uint32_t tx, const char *domain);
+
+/** Adds a data row to the signal identified by 'tx'. */
+
+int mrp_tx_add_data(uint32_t tx, const char *row);
+
+/** Adds a success callback to the signal identified by 'tx'. The callback
+    will be called if the signal is successfully ACKed by all enforcement
+    points registered to listen for the domains. If the success or error
+    callback isn't set, the enforcement points are not required to reply
+    to the signal.*/
+
+void mrp_tx_add_success_cb(uint32_t tx, mrp_tx_success_cb cb, void *data);
+
+/** Adds an error callback to the signal identified by 'tx'. The callback
+    will be called if the signal is NACKed or not answered to by one or more
+    enforcement points registered to listen for the domains. If the success
+    or error callback isn't set, the enforcement points are not required to
+    reply to the signal.*/
+
+void mrp_tx_add_error_cb(uint32_t tx, mrp_tx_error_cb cb, void *data);
+
+/** Closes the signal identified by 'tx' and sends it onward to the enforcement
+    points.*/
+
+void mrp_tx_close_signal(uint32_t tx);
+
+/** Cancels a signal identified by 'tx'.*/
+
+void mrp_tx_cancel_signal(uint32_t tx);
+
+
+#endif /* __MURPHY_SIGNALLING_H__ */
diff --git a/src/plugins/signalling/transaction.c b/src/plugins/signalling/transaction.c
new file mode 100644 (file)
index 0000000..79ef360
--- /dev/null
@@ -0,0 +1,263 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+
+#include <murphy/common.h>
+#include <murphy/core.h>
+#include <murphy/plugins/signalling/signalling.h>
+
+#include "plugin.h"
+#include "util.h"
+#include "client.h"
+#include "transaction.h"
+
+extern mrp_plugin_t *signalling_plugin;
+
+
+void free_transaction(transaction_t *tx)
+{
+    uint i;
+
+    for (i = 0; i < tx->data.n_rows; i++) {
+        mrp_free(tx->data.rows[i]);
+    }
+    mrp_free(tx->data.rows);
+
+    for (i = 0; i < tx->data.n_domains; i++) {
+        mrp_free(tx->data.domains[i]);
+    }
+    mrp_free(tx->data.domains);
+
+    mrp_free(tx->acked);
+    mrp_free(tx->nacked);
+    mrp_free(tx->not_answered);
+
+    mrp_free(tx);
+}
+
+
+transaction_t *get_transaction(data_t *ctx, uint32_t id)
+{
+    return mrp_htbl_lookup(ctx->txs, u_to_p(id));
+}
+
+
+void put_transaction(data_t *ctx, transaction_t *tx)
+{
+    mrp_htbl_insert(ctx->txs, u_to_p(tx->id), tx);
+}
+
+
+void remove_transaction(data_t *ctx, transaction_t *tx)
+{
+    mrp_htbl_remove(ctx->txs, u_to_p(tx->id), TRUE);
+}
+
+
+static uint32_t assign_id(data_t *ctx)
+{
+    return ctx->next_id++;
+}
+
+
+static bool domain_match(client_t *c, transaction_t *tx)
+{
+    uint i, j;
+
+    for (i = 0; i < tx->data.n_domains; i++) {
+        for (j = 0; j < c->ndomains; j++) {
+            if (strcmp(tx->data.domains[i], c->domains[j]) == 0) {
+                return TRUE;
+            }
+        }
+    }
+    return FALSE;
+}
+
+
+static int is_interested(void *key, void *entry, void *user_data)
+{
+    transaction_t *tx = user_data;
+    client_t *c = entry;
+
+    MRP_UNUSED(key);
+
+    if (!tx || !c)
+        return MRP_HTBL_ITER_STOP;
+
+    if (domain_match(c, tx)) {
+        tx->not_answered[tx->n_not_answered++] = c->name;
+    }
+
+    return MRP_HTBL_ITER_MORE;
+}
+
+
+int fire_transaction(data_t *ctx, transaction_t *tx)
+{
+    /* TODO: make proper queuing */
+
+    uint i;
+
+    for (i = 0; i < tx->n_not_answered; i++) {
+        client_t *c = mrp_htbl_lookup(ctx->clients, tx->not_answered[i]);
+        if (send_policy_decision(ctx, c, tx) < 0) {
+            signalling_error("Failed to send policy decision to %s", c->name);
+        }
+    }
+
+    return 0;
+}
+
+
+void complete_transaction(data_t *ctx, transaction_t *tx)
+{
+    /* call the transaction callbacks */
+
+    if (tx->n_not_answered == 0 && tx->n_acked == tx->n_total) {
+        tx->data.success_cb(tx->id, tx->data.success_data);
+    }
+    else if (tx->n_nacked > 0 ){
+        tx->data.error_cb(tx->id, MRP_TX_ERROR_NACKED, tx->data.error_data);
+    }
+    else {
+        tx->data.error_cb(tx->id, MRP_TX_ERROR_NOT_ANSWERED, tx->data.error_data);
+    }
+
+    /* remove the transaction from the list */
+    remove_transaction(ctx, tx);
+}
+
+
+static uint32_t mrp_tx_open_signal_with_id(uint32_t id)
+{
+    data_t *ctx = signalling_plugin->data;
+    transaction_t *tx;
+
+    tx = mrp_allocz(sizeof(transaction_t));
+    tx->id = id;
+    tx->data.row_array_size = 32;
+    tx->data.rows = mrp_allocz(tx->data.row_array_size);
+
+    tx->data.domain_array_size = 8;
+    tx->data.rows = mrp_allocz(tx->data.domain_array_size);
+
+    put_transaction(ctx, tx);
+
+    return id;
+}
+
+
+uint32_t mrp_tx_open_signal()
+{
+    data_t *ctx = signalling_plugin->data;
+
+    return mrp_tx_open_signal_with_id(assign_id(ctx));
+}
+
+
+int mrp_tx_add_domain(uint32_t id, const char *domain)
+{
+    data_t *ctx = signalling_plugin->data;
+    transaction_t *tx;
+
+    tx = get_transaction(ctx, id);
+
+    if (!tx)
+        return -1;
+
+    tx->data.domains[tx->data.n_domains++] = mrp_strdup(domain);
+
+    if (tx->data.n_domains == tx->data.domain_array_size) {
+        tx->data.n_domains *= 2;
+        tx->data.domains = mrp_realloc(tx->data.domains, tx->data.n_domains);
+    }
+
+    return 0;
+}
+
+
+int mrp_tx_add_data(uint32_t id, const char *row)
+{
+    data_t *ctx = signalling_plugin->data;
+    transaction_t *tx;
+
+    tx = get_transaction(ctx, id);
+
+    if (!tx)
+        return -1;
+
+    tx->data.rows[tx->data.n_rows++] = mrp_strdup(row);
+
+    if (tx->data.n_rows == tx->data.row_array_size) {
+        tx->data.n_rows *= 2;
+        tx->data.rows = mrp_realloc(tx->data.rows, tx->data.n_rows);
+    }
+
+    return 0;
+}
+
+
+void mrp_tx_add_success_cb(uint32_t id, mrp_tx_success_cb cb, void *data)
+{
+    data_t *ctx = signalling_plugin->data;
+    transaction_t *tx;
+
+    tx = get_transaction(ctx, id);
+
+    if (tx) {
+        tx->data.success_cb = cb;
+        tx->data.success_data = data;
+    }
+}
+
+
+void mrp_tx_add_error_cb(uint32_t id, mrp_tx_error_cb cb, void *data)
+{
+    data_t *ctx = signalling_plugin->data;
+    transaction_t *tx;
+
+    tx = get_transaction(ctx, id);
+
+    if (tx) {
+        tx->data.error_cb = cb;
+        tx->data.error_data = data;
+    }
+}
+
+
+void mrp_tx_close_signal(uint32_t id)
+{
+    data_t *ctx = signalling_plugin->data;
+    transaction_t *tx;
+
+    tx = get_transaction(ctx, id);
+
+    if (tx) {
+        /* allocate the client arrays */
+
+        tx->acked = mrp_allocz_array(char *, ctx->n_clients);
+        tx->nacked = mrp_allocz_array(char *, ctx->n_clients);
+        tx->not_answered = mrp_allocz_array(char *, ctx->n_clients);
+
+        mrp_htbl_foreach(ctx->clients, is_interested, tx);
+
+        tx->n_total = tx->n_not_answered;
+        fire_transaction(ctx, tx);
+    }
+}
+
+
+void mrp_tx_cancel_signal(uint32_t id)
+{
+    data_t *ctx = signalling_plugin->data;
+    transaction_t *tx;
+
+    tx = get_transaction(ctx, id);
+
+    if (tx) {
+        remove_transaction(ctx, tx);
+    }
+}
diff --git a/src/plugins/signalling/transaction.h b/src/plugins/signalling/transaction.h
new file mode 100644 (file)
index 0000000..db1c5ff
--- /dev/null
@@ -0,0 +1,53 @@
+#ifndef __MURPHY_SIGNALLING_TRANSACTION_H__
+#define __MURPHY_SIGNALLING_TRANSACTION_H__
+
+#include <stdint.h>
+
+#include <murphy/common.h>
+#include <murphy/plugins/signalling/signalling.h>
+
+#include "plugin.h"
+
+typedef struct {
+    char **domains;
+    uint32_t n_domains;
+    uint32_t domain_array_size;
+    char **rows;
+    uint32_t n_rows;
+    uint32_t row_array_size;
+
+    mrp_tx_success_cb success_cb;
+    void *success_data;
+    mrp_tx_error_cb error_cb;
+    void *error_data;
+} transaction_data_t;
+
+/* an ongoing transaction */
+typedef struct {
+    uint32_t id;         /* The real ID. */
+    uint32_t caller_id;  /* Id assigned by caller. */
+    char **acked;
+    char **nacked;
+    char **not_answered;
+
+    uint n_acked;
+    uint n_nacked;
+    uint n_not_answered;
+
+    uint n_total;
+    transaction_data_t data;
+} transaction_t;
+
+
+int fire_transaction(data_t *ctx, transaction_t *tx);
+
+void complete_transaction(data_t *ctx, transaction_t *tx);
+
+void free_transaction(transaction_t *tx);
+transaction_t *get_transaction(data_t *ctx, uint32_t id);
+void put_transaction(data_t *ctx, transaction_t *tx);
+void remove_transaction(data_t *ctx, transaction_t *tx);
+
+
+
+#endif /* __MURPHY_SIGNALLING_TRANSACTION_H__ */
diff --git a/src/plugins/signalling/util.c b/src/plugins/signalling/util.c
new file mode 100644 (file)
index 0000000..db0b248
--- /dev/null
@@ -0,0 +1,45 @@
+#include <sys/types.h>
+
+#include "util.h"
+
+void *u_to_p(uint32_t u)
+{
+#ifdef __SIZEOF_POINTER__
+#if __SIZEOF_POINTER__ == 8
+    uint64_t o = u;
+#else
+    uint32_t o = u;
+#endif
+#else
+    uint32_t o = o;
+#endif
+    return (void *) o;
+}
+
+uint32_t p_to_u(const void *p)
+{
+#ifdef __SIZEOF_POINTER__
+#if __SIZEOF_POINTER__ == 8
+    uint32_t o = 0;
+    uint64_t big = (uint64_t) p;
+    o = big & 0xffffffff;
+#else
+    uint32_t o = (uint32_t) p;
+#endif
+#else
+    uint32_t o = p;
+#endif
+    return o;
+}
+
+
+int int_comp(const void *key1, const void *key2)
+{
+    return key1 != key2;
+}
+
+
+uint32_t int_hash(const void *key)
+{
+    return p_to_u(key);
+}
diff --git a/src/plugins/signalling/util.h b/src/plugins/signalling/util.h
new file mode 100644 (file)
index 0000000..eea1d75
--- /dev/null
@@ -0,0 +1,15 @@
+#ifndef __MURPHY_SIGNALLING_UTIL_H__
+#define __MURPHY_SIGNALLING_UTIL_H__
+
+#include <stdint.h>
+
+#define signalling_info(fmt, args...)  mrp_log_info("signalling: "fmt , ## args)
+#define signalling_warn(fmt, args...)  mrp_log_warning("signalling: "fmt , ## args)
+#define signalling_error(fmt, args...) mrp_log_error("signalling: "fmt , ## args)
+
+void *u_to_p(uint32_t u);
+uint32_t p_to_u(const void *p);
+int int_comp(const void *key1, const void *key2);
+uint32_t int_hash(const void *key);
+
+#endif /* __MURPHY_SIGNALLING_UTIL_H__ */
diff --git a/src/plugins/tests/Makefile.am b/src/plugins/tests/Makefile.am
new file mode 100644 (file)
index 0000000..791081e
--- /dev/null
@@ -0,0 +1,8 @@
+AM_CFLAGS = $(WARNING_CFLAGS) -I$(top_builddir)
+
+noinst_PROGRAMS  = signalling-test
+
+# signalling test
+signalling_test_SOURCES = signalling-client.c
+signalling_test_CFLAGS  = $(AM_CFLAGS)
+signalling_test_LDADD   = ../../libmurphy-common.la
diff --git a/src/plugins/tests/signalling-client.c b/src/plugins/tests/signalling-client.c
new file mode 100644 (file)
index 0000000..ffa108c
--- /dev/null
@@ -0,0 +1,158 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <netdb.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include <murphy/common.h>
+#include <murphy/core.h>
+#include <murphy/plugins/signalling/signalling-protocol.h>
+
+typedef struct {
+    mrp_transport_t *t;
+    mrp_mainloop_t *ml;
+} client_t;
+
+
+static int send_registration(client_t *c)
+{
+    char *name = "test ep";
+    char *domains[] = { "domain1", "domain2" };
+    ep_register_t msg;
+    int ret;
+
+    msg.ep_name = name;
+    msg.domains = domains;
+    msg.n_domains = 2;
+
+    ret = mrp_transport_senddata(c->t, &msg, TAG_REGISTER);
+
+    if (!ret) {
+        printf("failed to send register message\n");
+    }
+
+    return ret;
+}
+
+
+static int send_reply(client_t *c, ep_decision_t *msg, uint32_t success)
+{
+    ep_ack_t reply;
+    int ret;
+
+    reply.id = msg->id;
+    reply.success = success;
+
+    ret = mrp_transport_senddata(c->t, &reply, TAG_ACK);
+
+    if (!ret) {
+        printf("failed to send reply\n");
+    }
+
+    return ret;
+}
+
+
+static void handle_decision(client_t *c, ep_decision_t *msg)
+{
+    printf("Handle decision\n");
+    send_reply(c, msg, EP_ACK);
+
+    return;
+}
+
+
+static void closed_evt(mrp_transport_t *t, int error, void *user_data)
+{
+    (void) t;
+    (void) error;
+    (void) user_data;
+    printf("Received closed event\n");
+}
+
+
+static void recvfrom_evt(mrp_transport_t *t, void *data, uint16_t tag,
+             mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data)
+{
+    client_t *c = user_data;
+
+    MRP_UNUSED(addr);
+    MRP_UNUSED(addrlen);
+    MRP_UNUSED(user_data);
+    MRP_UNUSED(t);
+
+    printf("Received message (0x%02x)\n", tag);
+
+    switch (tag) {
+        case TAG_POLICY_DECISION:
+            handle_decision(c, data);
+            break;
+        case TAG_ERROR:
+            printf("Server sends an error message!\n");
+            break;
+        default:
+            /* no other messages supported ATM */
+            break;
+    }
+
+}
+
+
+static void recv_evt(mrp_transport_t *t, void *data, uint16_t tag, void *user_data)
+{
+    recvfrom_evt(t, data, tag, NULL, 0, user_data);
+}
+
+
+int main()
+{
+    socklen_t alen;
+    mrp_sockaddr_t addr;
+    int ret, flags;
+
+    static client_t client;
+    static mrp_transport_evt_t evt; /* static members are initialized to zero */
+
+    evt.closed = closed_evt;
+    evt.recvdatafrom = recvfrom_evt;
+    evt.recvdata = recv_evt;
+
+    if (!mrp_msg_register_type(&ep_register_descr) ||
+        !mrp_msg_register_type(&ep_decision_descr) ||
+        !mrp_msg_register_type(&ep_ack_descr)) {
+        printf("Registering data type failed!\n");
+        exit(1);
+    }
+
+    client.ml = mrp_mainloop_create();
+
+    flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_MODE_CUSTOM;
+
+    client.t = mrp_transport_create(client.ml, "unxs", &evt, &client, flags);
+    if (client.t == NULL) {
+        printf("Error creating a new transport!\n");
+        exit(1);
+    }
+
+    alen = mrp_transport_resolve(NULL, "unxs:/tmp/murphy/signalling", &addr, sizeof(addr), NULL);
+    if (alen <= 0) {
+        printf("Error resolving address! Maybe the host is not running?\n");
+        exit(1);
+    }
+
+
+    ret = mrp_transport_connect(client.t, &addr, alen);
+    if (ret == 0) {
+        printf("Connect failed!\n");
+        exit(1);
+    }
+
+    send_registration(&client);
+
+    mrp_mainloop_run(client.ml);
+
+    return 0;
+}