domain-control: support for automatic reconnection.
authorKrisztian Litkey <krisztian.litkey@intel.com>
Wed, 14 Nov 2012 11:34:55 +0000 (13:34 +0200)
committerKrisztian Litkey <krisztian.litkey@intel.com>
Wed, 14 Nov 2012 12:23:59 +0000 (14:23 +0200)
src/plugins/domain-control/client.c
src/plugins/domain-control/client.h
src/plugins/domain-control/domain-control-types.h
src/plugins/domain-control/test-client.c

index c9bbd8d..198b3ff 100644 (file)
@@ -122,7 +122,7 @@ static void destroy_domctl(mrp_domctl_t *dc)
 {
     int i;
 
-    mrp_free(dc->name);
+    purge_pending(dc);
 
     for (i = 0; i < dc->ntable; i++) {
         mrp_free((char *)dc->tables[i].table);
@@ -138,6 +138,7 @@ static void destroy_domctl(mrp_domctl_t *dc)
     }
     mrp_free(dc->watches);
 
+    mrp_free(dc->name);
     mrp_free(dc);
 }
 
@@ -200,7 +201,7 @@ static int domctl_register(mrp_domctl_t *dc)
 }
 
 
-int mrp_domctl_connect(mrp_domctl_t *dc, const char *address)
+static int try_connect(mrp_domctl_t *dc)
 {
     static mrp_transport_evt_t evt = {
         .closed      = closed_cb,
@@ -208,6 +209,55 @@ int mrp_domctl_connect(mrp_domctl_t *dc, const char *address)
         .recvmsgfrom = recvfrom_cb,
     };
 
+    dc->t = mrp_transport_create(dc->ml, dc->ttype, &evt, dc, 0);
+
+    if (dc->t != NULL) {
+        if (mrp_transport_connect(dc->t, &dc->addr, dc->addrlen))
+            if (domctl_register(dc))
+                return TRUE;
+
+        mrp_transport_destroy(dc->t);
+        dc->t = NULL;
+    }
+
+    return FALSE;
+}
+
+
+static void stop_reconnect(mrp_domctl_t *dc)
+{
+    mrp_del_timer(dc->ctmr);
+    dc->ctmr = NULL;
+}
+
+
+static void reconnect_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data)
+{
+    mrp_domctl_t *dc = (mrp_domctl_t *)user_data;
+
+    if (try_connect(dc))
+        stop_reconnect(dc);
+}
+
+
+static int start_reconnect(mrp_domctl_t *dc)
+{
+    int interval;
+
+    if (dc->ctmr == NULL && dc->cival >= 0) {
+        interval = dc->cival ? 1000 * dc->cival : 5000;
+        dc->ctmr = mrp_add_timer(dc->ml, interval, reconnect_cb, dc);
+
+        if (dc->ctmr == NULL)
+            return FALSE;
+    }
+
+    return TRUE;
+}
+
+
+int mrp_domctl_connect(mrp_domctl_t *dc, const char *address, int interval)
+{
     mrp_sockaddr_t  addr;
     socklen_t       addrlen;
     const char     *type;
@@ -218,16 +268,16 @@ int mrp_domctl_connect(mrp_domctl_t *dc, const char *address)
     addrlen = mrp_transport_resolve(NULL, address, &addr, sizeof(addr), &type);
 
     if (addrlen > 0) {
-        dc->t = mrp_transport_create(dc->ml, type, &evt, dc, 0);
+        dc->addr    = addr;
+        dc->addrlen = addrlen;
+        dc->cival   = interval;
+        dc->ttype   = type;
 
-        if (dc->t != NULL) {
-            if (mrp_transport_connect(dc->t, &addr, addrlen))
-                if (domctl_register(dc))
-                    return TRUE;
+        if (try_connect(dc))
+            return TRUE;
 
-            mrp_transport_destroy(dc->t);
-            dc->t = NULL;
-        }
+        if (interval >= 0)
+            return start_reconnect(dc);
     }
 
     return FALSE;
@@ -237,6 +287,7 @@ int mrp_domctl_connect(mrp_domctl_t *dc, const char *address)
 void mrp_domctl_disconnect(mrp_domctl_t *dc)
 {
     if (dc->t != NULL) {
+        stop_reconnect(dc);
         mrp_transport_destroy(dc->t);
         dc->t         = NULL;
         dc->connected = FALSE;
@@ -383,9 +434,8 @@ static void process_notify(mrp_domctl_t *dc, mrp_msg_t *msg, uint32_t seq)
 static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
 {
     mrp_domctl_t  *dc = (mrp_domctl_t *)user_data;
-    uint16_t       type, nchange, ntotal;
+    uint16_t       type;
     uint32_t       seq;
-    int            ntable, ncolumn;
     int32_t        error;
     const char    *errmsg;
 
@@ -423,15 +473,7 @@ static void recv_cb(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
         break;
 
     case MRP_PEPMSG_NOTIFY:
-        if (mrp_msg_get(msg,
-                        MRP_PEPMSG_UINT16(NCHANGE, &nchange),
-                        MRP_PEPMSG_UINT16(NTOTAL , &ntotal),
-                        MRP_MSG_END)) {
-            ntable  = nchange;
-            ncolumn = ntotal;
-
-            process_notify(dc, msg, seq);
-        }
+        process_notify(dc, msg, seq);
         break;
 
     default:
@@ -474,8 +516,10 @@ static void closed_cb(mrp_transport_t *t, int error, void *user_data)
 
     if (error)
         notify_disconnect(dc, error, strerror(error));
-    else
+    else {
         notify_disconnect(dc, ECONNRESET, "server has closed the connection");
+        start_reconnect(dc);
+    }
 }
 
 
index 91bbd27..57cfc3a 100644 (file)
@@ -104,8 +104,12 @@ mrp_domctl_t *mrp_domctl_create(const char *name, mrp_mainloop_t *ml,
 /** Destroy the given policy domain controller. */
 void mrp_domctl_destroy(mrp_domctl_t *dc);
 
-/** Connect and register the given controller to the server. */
-int mrp_domctl_connect(mrp_domctl_t *dc, const char *address);
+/**
+ * Connect and register the given controller to the server. If timeout
+ * is non-negative, it will be used to automatically attempt re-connecting
+ * to the server this often (in seconds) whenever the connection goes down.
+ */
+int mrp_domctl_connect(mrp_domctl_t *dc, const char *address, int timeout);
 
 /** Close the connection to the server. */
 void mrp_domctl_disconnect(mrp_domctl_t *dc);
index c5be022..a1054c3 100644 (file)
@@ -22,6 +22,11 @@ typedef struct pdp_s       pdp_t;
 struct mrp_domctl_s {
     char                    *name;       /* enforcment point name */
     mrp_mainloop_t          *ml;         /* main loop */
+    mrp_sockaddr_t           addr;       /* server address */
+    socklen_t                addrlen;    /* address length */
+    mrp_timer_t             *ctmr;       /* connection timer */
+    int                      cival;      /* connection attempt interval */
+    const char              *ttype;      /* transport type */
     mrp_transport_t         *t;          /* transport towards murphy */
     int                      connected;  /* transport is up */
     mrp_domctl_table_t      *tables;     /* owned tables */
index ac9c377..a04ac96 100644 (file)
@@ -449,8 +449,8 @@ void update_devices(mrp_domctl_data_t *data)
     int                 i;
 
     if (data->nrow != 0 && data->ncolumn != DEVICE_NCOLUMN) {
-        error_msg("incorrect number of columns (%d) in device update",
-                  data->ncolumn);
+        error_msg("incorrect number of columns in device update (%d != %d)",
+                  data->ncolumn, DEVICE_NCOLUMN);
         return;
     }
 
@@ -495,8 +495,8 @@ void update_streams(mrp_domctl_data_t *data)
     int                 i;
 
     if (data->nrow != 0 && data->ncolumn != STREAM_NCOLUMN) {
-        error_msg("incorrect number of columns (%d) in stream update",
-                  data->ncolumn);
+        error_msg("incorrect number of columns in stream update (%d != %d)",
+                  data->ncolumn, STREAM_NCOLUMN);
         return;
     }
 
@@ -541,8 +541,8 @@ void update_zones(mrp_domctl_data_t *data)
     int                 i;
 
     if (data->nrow != 0 && data->ncolumn != ZONE_NCOLUMN) {
-        error_msg("incorrect number of columns (%d) in zone update",
-                  data->ncolumn);
+        error_msg("incorrect number of columns in zone update (%d != %d)",
+                  data->ncolumn, ZONE_NCOLUMN);
         return;
     }
 
@@ -585,8 +585,8 @@ void update_calls(mrp_domctl_data_t *data)
     int                 i;
 
     if (data->nrow != 0 && data->ncolumn != CALL_NCOLUMN) {
-        error_msg("incorrect number of columns (%d) in call update.",
-               data->ncolumn);
+        error_msg("incorrect number of columns in call update (%d != %d)",
+                  data->ncolumn, CALL_NCOLUMN);
         return;
     }
 
@@ -846,6 +846,57 @@ static void connect_notify(mrp_domctl_t *dc, int connected, int errcode,
 }
 
 
+static void dump_data(mrp_domctl_data_t *table)
+{
+    mrp_domctl_value_t *row;
+    int                 i, j;
+    char                buf[1024], *p;
+    const char         *t;
+    int                 n, l;
+
+    info_msg("Table #%d: %d rows x %d columns", table->id,
+             table->nrow, table->ncolumn);
+
+    for (i = 0; i < table->nrow; i++) {
+        row = table->rows[i];
+        p   = buf;
+        n   = sizeof(buf);
+
+        for (j = 0, t = ""; j < table->ncolumn; j++, t = ", ") {
+            switch (row[j].type) {
+            case MRP_DOMCTL_STRING:
+                l  = snprintf(p, n, "%s'%s'", t, row[j].str);
+                p += l;
+                n -= l;
+                break;
+            case MRP_DOMCTL_INTEGER:
+                l  = snprintf(p, n, "%s%d", t, row[j].s32);
+                p += l;
+                n -= l;
+                break;
+            case MRP_DOMCTL_UNSIGNED:
+                l  = snprintf(p, n, "%s%u", t, row[j].u32);
+                p += l;
+                n -= l;
+                break;
+            case MRP_DOMCTL_DOUBLE:
+                l  = snprintf(p, n, "%s%f", t, row[j].dbl);
+                p += l;
+                n -= l;
+                break;
+            default:
+                l  = snprintf(p, n, "%s<invalid column 0x%x>",
+                              t, row[j].type);
+                p += l;
+                n -= l;
+            }
+        }
+
+        info_msg("row #%d: { %s }", i, buf);
+    }
+}
+
+
 static void data_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
                         int ntable, void *user_data)
 {
@@ -853,6 +904,14 @@ static void data_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
 
     MRP_UNUSED(dc);
 
+    if (client->verbose) {
+        int i;
+
+        for (i = 0; i < ntable; i++) {
+            dump_data(tables + i);
+        }
+    }
+
     update_imports(client, tables, ntable);
 }
 
@@ -1029,7 +1088,7 @@ static void client_cleanup(client_t *c)
 
 static void client_run(client_t *c)
 {
-    if (mrp_domctl_connect(c->dc, c->addrstr))
+    if (mrp_domctl_connect(c->dc, c->addrstr, 0))
         info_msg("Connected to server at %s.", c->addrstr);
     else
         error_msg("Failed to connect to server at %s.", c->addrstr);