amb: started working on setting properies to AMB.
authorIsmo Puustinen <ismo.puustinen@intel.com>
Wed, 9 Jan 2013 14:26:52 +0000 (16:26 +0200)
committerKrisztian Litkey <krisztian.litkey@intel.com>
Thu, 8 Jan 2015 16:37:08 +0000 (18:37 +0200)
src/plugins/plugin-amb.c

index 7f9ce44..669b134 100644 (file)
  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <sys/stat.h>
+
 #include <murphy/common.h>
 #include <murphy/core.h>
 #include <murphy/common/dbus.h>
 #include <murphy-db/mqi.h>
 #include <murphy-db/mql.h>
 
-#include <murphy/core/lua-bindings/murphy.h>
 #include <murphy/core/lua-utils/object.h>
+#include <murphy/core/lua-bindings/murphy.h>
 #include <murphy/core/lua-utils/funcbridge.h>
 #include <murphy/core/lua-decision/mdb.h>
+#include <murphy/core/lua-decision/element.h>
 
 enum {
     ARG_AMB_DBUS_ADDRESS,
     ARG_AMB_CONFIG_FILE,
     ARG_AMB_ID,
+    ARG_AMB_TPORT_ADDRESS
+};
+
+enum amb_type {
+    amb_string = 's',
+    amb_bool   = 'b',
+    amb_int32  = 'i',
+    amb_int16  = 'n',
+    amb_uint32 = 'u',
+    amb_uint16 = 'q',
+    amb_byte   = 'y',
+    amb_double = 'd',
 };
 
 #define AMB_NAME                "name"
@@ -59,7 +77,6 @@ enum {
 #define AMB_OUTPUTS             "outputs"
 
 
-
 /*
 signal sender=:1.117 -> dest=(null destination) serial=961
 path=/org/automotive/runningstatus/vehicleSpeed;
@@ -111,8 +128,14 @@ typedef struct {
     const char *amb_addr;
     const char *config_file;
     const char *amb_id;
+    const char *tport_addr;
     lua_State *L;
     mrp_list_hook_t lua_properties;
+
+    mrp_process_state_t amb_state;
+
+    mrp_transport_t *lt;
+    mrp_transport_t *t;
 } data_t;
 
 typedef struct {
@@ -213,9 +236,10 @@ static int amb_constructor(lua_State *L)
 
     MRP_LUA_ENTER;
 
-    mrp_log_info("> amb_constructor, stack size: %d", lua_gettop(L));
+    mrp_debug("> amb_constructor, stack size: %d", lua_gettop(L));
 
-    prop = mrp_lua_create_object(L, PROPERTY_CLASS, NULL, 0);
+    prop = (lua_amb_property_t *)
+            mrp_lua_create_object(L, PROPERTY_CLASS, NULL, 0);
 
     prop->handler_ref = LUA_NOREF;
     prop->outputs_ref = LUA_NOREF;
@@ -310,7 +334,7 @@ static int amb_constructor(lua_State *L)
     if (prop->handler_ref == LUA_NOREF && !prop->basic_table_name)
         goto error;
 
-    w = mrp_allocz(sizeof(dbus_property_watch_t));
+    w = (dbus_property_watch_t *) mrp_allocz(sizeof(dbus_property_watch_t));
 
     if (!w)
         goto error;
@@ -382,7 +406,8 @@ error:
 
 static int amb_getfield(lua_State *L)
 {
-    lua_amb_property_t *prop = mrp_lua_check_object(L, PROPERTY_CLASS, 1);
+    lua_amb_property_t *prop = (lua_amb_property_t *)
+            mrp_lua_check_object(L, PROPERTY_CLASS, 1);
     size_t field_name_len;
     const char *field_name = lua_tolstring(L, 2, &field_name_len);
 
@@ -812,7 +837,7 @@ error:
 static int property_signal_handler(mrp_dbus_t *dbus, DBusMessage *msg,
         void *data)
 {
-    dbus_property_watch_t *w = data;
+    dbus_property_watch_t *w = (dbus_property_watch_t *) data;
 
     MRP_UNUSED(dbus);
 
@@ -831,7 +856,7 @@ static int property_signal_handler(mrp_dbus_t *dbus, DBusMessage *msg,
 static void property_reply_handler(mrp_dbus_t *dbus, DBusMessage *msg,
         void *data)
 {
-    dbus_property_watch_t *w = data;
+    dbus_property_watch_t *w = (dbus_property_watch_t *) data;
 
     MRP_UNUSED(dbus);
 
@@ -1010,7 +1035,7 @@ static basic_table_data_t *create_basic_property_table(const char *table_name,
     if (strlen(member) > 64)
         goto error;
 
-    tdata = mrp_allocz(sizeof(basic_table_data_t));
+    tdata = (basic_table_data_t *) mrp_allocz(sizeof(basic_table_data_t));
 
     if (!tdata)
         goto error;
@@ -1107,7 +1132,6 @@ static int load_config(lua_State *L, const char *path)
     }
 }
 
-
 static void amb_watch(const char *id, mrp_process_state_t state, void *data)
 {
     data_t *ctx = (data_t *) data;
@@ -1117,8 +1141,296 @@ static void amb_watch(const char *id, mrp_process_state_t state, void *data)
 
     mrp_log_info("ambd state changed to %s",
             state == MRP_PROCESS_STATE_READY ? "ready" : "not ready");
+
+
+    if (state == MRP_PROCESS_STATE_NOT_READY &&
+            ctx->amb_state == MRP_PROCESS_STATE_READY) {
+        mrp_log_error("lost connection to ambd");
+    }
+
+    ctx->amb_state = state;
 }
 
+/* functions for handling updating the AMB properties */
+
+static int update_amb_property(char *name, enum amb_type type, void *value,
+        data_t *ctx)
+{
+    int ret = -1;
+    mrp_msg_t *msg = mrp_msg_create(
+            MRP_MSG_TAG_STRING(1, name),
+            MRP_MSG_FIELD_END);
+
+    if (!msg)
+        goto end;
+
+    if (!ctx->t)
+        goto end;
+
+    switch(type) {
+        case amb_string:
+            mrp_msg_append(msg, 2, MRP_MSG_FIELD_STRING, value);
+            break;
+        case amb_bool:
+            mrp_msg_append(msg, 2, MRP_MSG_FIELD_BOOL, *(bool *)value);
+            break;
+        case amb_int32:
+            mrp_msg_append(msg, 2, MRP_MSG_FIELD_INT32, *(int32_t *)value);
+            break;
+        case amb_int16:
+            mrp_msg_append(msg, 2, MRP_MSG_FIELD_INT16, *(int16_t *)value);
+            break;
+        case amb_uint32:
+            mrp_msg_append(msg, 2, MRP_MSG_FIELD_UINT32, *(uint32_t *)value);
+            break;
+        case amb_uint16:
+            mrp_msg_append(msg, 2, MRP_MSG_FIELD_UINT16, *(uint16_t *)value);
+            break;
+        case amb_byte:
+            mrp_msg_append(msg, 2, MRP_MSG_FIELD_UINT8, *(uint8_t *)value);
+            break;
+        case amb_double:
+            mrp_msg_append(msg, 2, MRP_MSG_FIELD_DOUBLE, *(double *)value);
+            break;
+    }
+
+    if (!mrp_transport_send(ctx->t, msg))
+        goto end;
+
+    mrp_log_info("Sent message to AMB");
+
+    ret = 0;
+
+end:
+    if (msg)
+        mrp_msg_unref(msg);
+
+    return ret;
+}
+
+
+static bool initiate_func(lua_State *L, void *data,
+                    const char *signature, mrp_funcbridge_value_t *args,
+                    char  *ret_type, mrp_funcbridge_value_t *ret_val)
+{
+    MRP_UNUSED(L);
+    MRP_UNUSED(args);
+    MRP_UNUSED(data);
+
+    if (!signature || signature[0] != 'o') {
+        return false;
+    }
+
+    *ret_type = MRP_FUNCBRIDGE_BOOLEAN;
+    ret_val->boolean = true;
+
+    return true;
+}
+
+
+static bool update_func(lua_State *L, void *data,
+                    const char *signature, mrp_funcbridge_value_t *args,
+                    char  *ret_type, mrp_funcbridge_value_t *ret_val)
+{
+    mrp_lua_sink_t *sink;
+    const char *type, *property;
+
+    const char *s_val;
+    int32_t i_val;
+    uint32_t u_val;
+    double d_val;
+
+    int ret = -1;
+
+    MRP_UNUSED(L);
+
+    if (!signature || signature[0] != 'o') {
+        return false;
+    }
+
+    sink = (mrp_lua_sink_t *) args[0].pointer;
+
+    property = mrp_lua_sink_get_property(sink);
+
+    if (!property || strlen(property) == 0) {
+        goto error;
+    }
+
+    /* ok, for now we only support updates of basic values */
+
+    type = mrp_lua_sink_get_type(sink);
+
+    if (!type || strlen(type) != 1) {
+        goto error;
+    }
+
+    switch (type[0]) {
+        case amb_double:
+            d_val = mrp_lua_sink_get_floating(sink,0,0,0);
+            ret = update_amb_property((char *) property,
+                    (enum amb_type) type[0], &d_val, (data_t *) data);
+            break;
+        case amb_int16:
+        case amb_int32:
+            i_val = mrp_lua_sink_get_integer(sink,0,0,0);
+            ret = update_amb_property((char *) property,
+                    (enum amb_type) type[0], &i_val, (data_t *) data);
+            break;
+        case amb_bool:
+        case amb_byte:
+        case amb_uint16:
+        case amb_uint32:
+            u_val = mrp_lua_sink_get_unsigned(sink,0,0,0);
+            ret = update_amb_property((char *) property,
+                    (enum amb_type) type[0], &u_val, (data_t *) data);
+            break;
+        case amb_string:
+            s_val = mrp_lua_sink_get_string(sink,0,0,0,NULL,0);
+            ret = update_amb_property((char *) property,
+                    (enum amb_type) type[0], (void *) s_val,
+                    (data_t *) data);
+            break;
+    }
+
+    *ret_type = MRP_FUNCBRIDGE_BOOLEAN;
+    ret_val->boolean = true;
+
+    return ret == 0;
+
+error:
+    mrp_log_error("AMB: error processing the property change!");
+
+    *ret_type = MRP_FUNCBRIDGE_BOOLEAN;
+    ret_val->boolean = false;
+
+    return true;
+}
+
+
+static void recvdatafrom_evt(mrp_transport_t *t, void *data, uint16_t tag,
+                     mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data)
+{
+    /* At the moment we are not receiving anything from AMB through this
+     * transport, however that might change */
+
+    MRP_UNUSED(t);
+    MRP_UNUSED(data);
+    MRP_UNUSED(tag);
+    MRP_UNUSED(addr);
+    MRP_UNUSED(addrlen);
+    MRP_UNUSED(user_data);
+}
+
+
+static void recvdata_evt(mrp_transport_t *t, void *data, uint16_t tag, void *user_data)
+{
+    recvdatafrom_evt(t, data, tag, NULL, 0, user_data);
+}
+
+
+static void closed_evt(mrp_transport_t *t, int error, void *user_data)
+{
+    data_t *ctx = (data_t *) user_data;
+
+    MRP_UNUSED(t);
+    MRP_UNUSED(error);
+    MRP_UNUSED(ctx);
+
+    /* TODO: handle not connected case */
+}
+
+static void connection_evt(mrp_transport_t *lt, void *user_data)
+{
+    data_t *ctx = (data_t *) user_data;
+
+    mrp_log_info("AMB connection!");
+
+    if (ctx->t) {
+        mrp_log_error("Already connected");
+    }
+    else {
+        ctx->t = mrp_transport_accept(lt, ctx, 0);
+    }
+
+    /* close the listening socket, since we only have one client */
+
+    mrp_transport_destroy(lt);
+    ctx->lt = NULL;
+}
+
+
+static int create_transport(mrp_mainloop_t *ml, data_t *ctx)
+{
+    socklen_t alen;
+    mrp_sockaddr_t addr;
+    int flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_MODE_MSG;
+    const char *atype;
+    struct stat statbuf;
+
+    static mrp_transport_evt_t evt; /* static members are initialized to zero */
+
+    mrp_log_info("> create_transport");
+
+    evt.closed = closed_evt;
+    evt.connection = connection_evt;
+    evt.recvdata = recvdata_evt;
+    evt.recvdatafrom = recvdatafrom_evt;
+
+    alen = mrp_transport_resolve(NULL, ctx->tport_addr, &addr, sizeof(addr),
+            &atype);
+    if (alen <= 0) {
+        mrp_log_error("failed to resolve address");
+        goto error;
+    }
+
+    /* remove the old socket if present */
+
+    if (strcmp(atype, "unxs") == 0) {
+        char *path = addr.unx.sun_path;
+        if (path[0] == '/') {
+            /* if local socket and file exists, remove it */
+            if (stat(path, &statbuf) == 0) {
+                if (S_ISSOCK(statbuf.st_mode)) {
+                    if (unlink(path) < 0) {
+                        mrp_log_error("error removing the socket");
+                        goto error;
+                    }
+                }
+                else {
+                    mrp_log_error("a file where the socket should be created");
+                    goto error;
+                }
+            }
+        }
+    }
+
+
+    ctx->lt = mrp_transport_create(ml, atype, &evt, ctx, flags);
+    if (ctx->lt == NULL) {
+        mrp_log_error("failed to create transport");
+        goto error;
+    }
+
+    if (!mrp_transport_bind(ctx->lt, &addr, alen)) {
+        mrp_log_error("failed to bind transport to address");
+        goto error;
+    }
+
+    if (!mrp_transport_listen(ctx->lt, 1)) {
+        mrp_log_error("failed to listen on transport");
+        goto error;
+    }
+
+    return 0;
+
+error:
+    if (ctx->lt)
+        mrp_transport_destroy(ctx->lt);
+
+    return -1;
+}
+
+/* plugin init and deinit */
 
 static int amb_init(mrp_plugin_t *plugin)
 {
@@ -1128,25 +1440,32 @@ static int amb_init(mrp_plugin_t *plugin)
     ctx = (data_t *) mrp_allocz(sizeof(data_t));
 
     if (!ctx)
-        goto error;
+        return FALSE;
+
+    mrp_list_init(&ctx->lua_properties);
 
     plugin->data = ctx;
 
     ctx->amb_addr = args[ARG_AMB_DBUS_ADDRESS].str;
     ctx->config_file = args[ARG_AMB_CONFIG_FILE].str;
     ctx->amb_id = args[ARG_AMB_ID].str;
+    ctx->tport_addr = args[ARG_AMB_TPORT_ADDRESS].str;
 
     mrp_log_info("amb dbus address: %s", ctx->amb_addr);
     mrp_log_info("amb config file: %s", ctx->config_file);
+    mrp_log_info("amb transport address: %s", ctx->tport_addr);
 
     ctx->dbus = mrp_dbus_connect(plugin->ctx->ml, "system", NULL);
 
     if (!ctx->dbus)
         goto error;
 
-    /* initialize lua support */
+    /* initialize transport towards ambd */
 
-    mrp_list_init(&ctx->lua_properties);
+    if (create_transport(plugin->ctx->ml, ctx) < 0)
+        goto error;
+
+    /* initialize lua support */
 
     global_ctx = ctx;
 
@@ -1155,6 +1474,15 @@ static int amb_init(mrp_plugin_t *plugin)
     if (!ctx->L)
         goto error;
 
+    /* functions to handle the direct property updates */
+
+    mrp_funcbridge_create_cfunc(ctx->L, "amb_initiate", "o",
+                                initiate_func, (void *)ctx);
+    mrp_funcbridge_create_cfunc(ctx->L, "amb_update", "o",
+                                update_func, (void *)ctx);
+
+    /* custom class for configuration */
+
     mrp_lua_create_object_class(ctx->L, MRP_LUA_CLASS(amb, property));
 
     /* TODO: create here a "manager" lua object and put that to the global
@@ -1168,17 +1496,50 @@ static int amb_init(mrp_plugin_t *plugin)
             - destination table
      */
 
-    load_config(ctx->L, ctx->config_file);
+    if (!load_config(ctx->L, ctx->config_file))
+        goto error;
 
     /* TODO: if loading the config failed, go to error */
 
+    mrp_process_set_state("murphy-amb", MRP_PROCESS_STATE_READY);
+
     if (mrp_process_set_watch(ctx->amb_id, plugin->ctx->ml, amb_watch, ctx) < 0)
-        goto error;
+        goto process_watch_failed;
+
+    ctx->amb_state = mrp_process_query_state(ctx->amb_id);
+
+    return TRUE;
 
+process_watch_failed:
+    /* let's not quit yet? */
     return TRUE;
 
 error:
-    /* TODO */
+    {
+        mrp_list_hook_t *p, *n;
+
+        mrp_list_foreach(&ctx->lua_properties, p, n) {
+            dbus_property_watch_t *w =
+                    mrp_list_entry(p, dbus_property_watch_t, hook);
+
+            destroy_prop(ctx, w);
+        }
+    }
+
+    if (ctx->dbus) {
+        mrp_dbus_unref(ctx->dbus);
+        ctx->dbus = NULL;
+    }
+
+    if (ctx->t) {
+        mrp_transport_destroy(ctx->t);
+        ctx->t = NULL;
+    }
+
+    mrp_process_remove_watch(ctx->amb_id);
+
+    mrp_free(ctx);
+
     return FALSE;
 }
 
@@ -1188,6 +1549,8 @@ static void amb_exit(mrp_plugin_t *plugin)
     data_t *ctx = (data_t *) plugin->data;
     mrp_list_hook_t *p, *n;
 
+    mrp_process_set_state("murphy-amb", MRP_PROCESS_STATE_NOT_READY);
+
     /* for all subscribed properties, unsubscribe and free memory */
 
     mrp_list_foreach(&ctx->lua_properties, p, n) {
@@ -1197,7 +1560,12 @@ static void amb_exit(mrp_plugin_t *plugin)
         destroy_prop(ctx, w);
     }
 
+    mrp_transport_destroy(ctx->t);
+    ctx->t = NULL;
+
     global_ctx = NULL;
+
+    mrp_free(ctx);
 }
 
 #define AMB_DESCRIPTION "A plugin for Automotive Message Broker D-Bus API."
@@ -1211,6 +1579,8 @@ static mrp_plugin_arg_t args[] = {
     MRP_PLUGIN_ARGIDX(ARG_AMB_CONFIG_FILE, STRING, "config_file",
             "/etc/murphy/plugins/amb/config.lua"),
     MRP_PLUGIN_ARGIDX(ARG_AMB_ID, STRING, "amb_id", "amb"),
+    MRP_PLUGIN_ARGIDX(ARG_AMB_TPORT_ADDRESS, STRING, "transport_address",
+            "unxs:/tmp/murphy/amb"),
 };