console: for the sake of testing, a UDP-capable console/console-client.
authorKrisztian Litkey <krisztian.litkey@intel.com>
Mon, 7 May 2012 13:42:33 +0000 (16:42 +0300)
committerKrisztian Litkey <krisztian.litkey@intel.com>
Mon, 7 May 2012 13:42:33 +0000 (16:42 +0300)
No, it does not need to always make sense... I just wanted to check how
big chunk of code you need to write in addition to handling the connection-
oriented/connectionless differences.

src/console-client/client.c
src/core/console-command.c
src/core/console.c
src/core/console.h
src/plugins/plugin-console.c

index d71831b..82ec307 100644 (file)
@@ -17,7 +17,7 @@
 #define client_error mrp_log_error
 
 #define DEFAULT_PROMPT  "murphy> "
-#define DEFAULT_CONSOLE "tcp:127.0.0.1:3000"
+#define DEFAULT_ADDRESS "tcp:127.0.0.1:3000"
 
 static void input_process_cb(char *input);
 
@@ -178,7 +178,7 @@ static void input_cleanup(client_t *c)
 void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
 {
     client_t *c = (client_t *)user_data;
-    char     *prompt, *output, buf[128];
+    char     *prompt, *output, buf[128], *dummy;
     size_t    size;
     
     MRP_UNUSED(t);
@@ -192,11 +192,11 @@ void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
        snprintf(buf, sizeof(buf), "%.*s> ", (int)size, prompt);
        prompt_set(c, buf);
     }
+    else if ((dummy = mrp_msg_find(msg, "bye", &size)) != NULL) {
+       mrp_mainloop_quit(c->ml, 0);
+       return;
+    }
     
-#if 0 /* done by the transport layer... */
-    mrp_msg_destroy(msg);
-#endif
-
     prompt_display(c);
 }
 
@@ -220,7 +220,7 @@ void closed_evt(mrp_transport_t *t, int error, void *user_data)
 }
 
 
-int client_setup(client_t *c, char *addr)
+int client_setup(client_t *c, const char *addrstr)
 {
     static mrp_transport_evt_t evt = {
        .closed   = closed_evt,
@@ -228,20 +228,37 @@ int client_setup(client_t *c, char *addr)
        .recvfrom = NULL,
     };
 
-    c->t = mrp_transport_create(c->ml, "tcp", &evt, c);
+    struct sockaddr  addr;
+    socklen_t        addrlen;
+    const char      *type;
+
+    if (!strncmp(addrstr, "udp:", 4))
+       type = "udp";
+    else
+       type = "tcp";
+
+    addrlen = mrp_transport_resolve(NULL, addrstr, &addr, sizeof(addr));
     
-    if (c->t == NULL) {
-       mrp_log_error("Failed to create new transport.");
-       return FALSE;
-    }
+    if (addrlen > 0) {
+       c->t = mrp_transport_create(c->ml, type, &evt, c, 0);
+    
+       if (c->t == NULL) {
+           mrp_log_error("Failed to create new transport.");
+           return FALSE;
+       }
 
-    if (!mrp_transport_connect(c->t, addr)) {
-       mrp_log_error("Failed to connect to %s.", addr);
-       mrp_transport_destroy(c->t);
-       return FALSE;
-    }
+       if (!mrp_transport_connect(c->t, &addr, addrlen)) {
+           mrp_log_error("Failed to connect to %s.", addrstr);
+           mrp_transport_destroy(c->t);
+           return FALSE;
+       }
 
-    return TRUE;
+       return TRUE;
+    }
+    else
+       mrp_log_error("Failed to resolve address '%s'.", addrstr);
+    
+    return FALSE;
 }
 
 
@@ -269,7 +286,8 @@ static void signal_handler(mrp_mainloop_t *ml, mrp_sighandler_t *h,
 
 int main(int argc, char *argv[])
 {
-    client_t c;
+    client_t    c;
+    const char *addr;
 
     MRP_UNUSED(argc);
     MRP_UNUSED(argv);
@@ -288,7 +306,12 @@ int main(int argc, char *argv[])
 
     mrp_add_sighandler(c.ml, SIGINT, signal_handler, &c);
 
-    if (!input_setup(&c) || !client_setup(&c, DEFAULT_CONSOLE))
+    if (argc == 2)
+       addr = argv[1];
+    else
+       addr = DEFAULT_ADDRESS;
+
+    if (!input_setup(&c) || !client_setup(&c, addr))
        exit(1);
     
     rl_client = &c;                      /* readline has not user_data */
index 8a76486..5842706 100644 (file)
@@ -144,8 +144,6 @@ static void cmd_exit(mrp_console_t *mc, void *user_data, int argc, char **argv)
     console_t *c = (console_t *)mc;
     char      *ha[2];
 
-    MRP_UNUSED(c);
-
     switch (argc) {
     case 1:
        if (c->grp != NULL)
index 1f5bb54..e500f4f 100644 (file)
@@ -178,8 +178,9 @@ void mrp_destroy_console(mrp_console_t *mc)
            fflush(mc->stdout);
        if (mc->stderr != NULL)
            fflush(mc->stderr);
-       
-       mc->destroyed = TRUE;
+
+       if (!mc->preserve)            /* the Kludge of Death... */
+           mc->destroyed = TRUE;
 
        if (mc->backend_data != NULL) {
            MRP_CONSOLE_BUSY(mc, {
index 8ac4e75..fb850d1 100644 (file)
@@ -60,7 +60,8 @@ typedef struct {
     FILE                  *stderr;                                     \
     void                  *backend_data;                               \
     int                    busy;                                       \
-    int                    destroyed : 1
+    int                    destroyed : 1;                              \
+    int                    preserve : 1 /* the Kludge of Death, Sir Robin... */
 
 struct mrp_console_s {
     MRP_CONSOLE_PUBLIC_FIELDS;
index 588bde9..1f84bd9 100644 (file)
 
 #define MRP_CFG_MAXLINE 4096             /* input line length limit */
 
+typedef struct console_s console_t;
+
 /*
  * console plugin data
  */
 
 typedef struct {
     const char      *address;            /* console address */
+    mrp_transport_t *t;                  /* transport we're listening on */
     int              sock;               /* main socket for new connections */
     mrp_io_watch_t  *iow;                /* main socket I/O watch */
     mrp_context_t   *ctx;                /* murphy context */
     mrp_list_hook_t  consoles;           /* active consoles */
+    struct sockaddr  addr;
+    socklen_t        addrlen;
+    console_t       *c;
 } data_t;
 
 
@@ -33,54 +39,13 @@ typedef struct {
  * a console instance
  */
 
-typedef struct {
+struct console_s {
     mrp_console_t   *mc;                 /* associated murphy console */
     mrp_transport_t *t;                  /* associated transport */
-} console_t;
-
-
-static int console_listen(const char *address)
-{
-    struct sockaddr addr;
-    socklen_t       addrlen;
-    int             sock, on;
-
-    addrlen = mrp_transport_resolve(NULL, address, &addr, sizeof(addr));
-
-    if (!addrlen) {
-       console_error("invalid console address '%s'.", address);
-       return FALSE;
-    }
-    
-    if ((sock = socket(addr.sa_family, SOCK_STREAM, 0)) < 0) {
-       console_error("failed to create console socket (%d: %s).",
-                     errno, strerror(errno));
-       goto fail;
-    }
-    
-    on = 1;
-    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
-
-    if (bind(sock, &addr, addrlen) != 0) {
-       console_error("failed to bind to address '%s' (%d: %s).",
-                     address, errno, strerror(errno));
-       goto fail;
-    }
-
-    if (listen(sock, 4) < 0) {
-       console_error("failed to listen for connections (%d: %s).",
-                     errno, strerror(errno));
-       goto fail;
-    }
-
-    return sock;
+    struct sockaddr  addr;
+    socklen_t        addrlen;
+};
 
- fail:
-    if (sock >= 0)
-       close(sock);
-    
-    return FALSE;
-}
 
 
 static ssize_t write_req(mrp_console_t *mc, void *buf, size_t size)
@@ -99,7 +64,7 @@ static ssize_t write_req(mrp_console_t *mc, void *buf, size_t size)
 }
 
 
-static void close_req(mrp_console_t *mc)
+static void tcp_close_req(mrp_console_t *mc)
 {
     console_t *c = (console_t *)mc->backend_data;
     
@@ -109,6 +74,19 @@ static void close_req(mrp_console_t *mc)
 }
 
 
+static void udp_close_req(mrp_console_t *mc)
+{
+    console_t *c = (console_t *)mc->backend_data;
+    mrp_msg_t *msg;
+    int        dummy = TRUE;
+
+    if ((msg = mrp_msg_create("bye", &dummy, sizeof(dummy), NULL)) != NULL)
+       mrp_transport_send(c->t, msg);
+    
+    mrp_transport_disconnect(c->t);
+}
+
+
 static void set_prompt_req(mrp_console_t *mc, const char *prompt)
 {
     console_t *c = (console_t *)mc->backend_data;
@@ -148,10 +126,47 @@ static void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
     }
     else
        mrp_log_error("Received malformed console message.");
+}
+
+
+static void recvfrom_evt(mrp_transport_t *t, mrp_msg_t *msg,
+                        void *addrptr, socklen_t addrlen, void *user_data)
+{
+    console_t       *c    = (console_t *)user_data;
+    struct sockaddr *addr = (struct sockaddr *)addrptr;
+    char            *input;
+    size_t           size;
+
+    MRP_UNUSED(t);
     
-#if 0 /* done by the transport layer... */
-    mrp_msg_unref(msg);         /* XXX TODO change to refcounting */
-#endif
+    mrp_debug("got new message...");
+
+    input = mrp_msg_find(msg, "input", &size);
+
+    if (input != NULL) {
+       struct sockaddr  a = c->addr;
+       socklen_t        l = c->addrlen;
+
+       c->addr    = *addr;
+       c->addrlen = addrlen;
+       
+       mrp_transport_connect(t, addr, addrlen);
+       MRP_CONSOLE_BUSY(c->mc, {
+               c->mc->evt.input(c->mc, input, size);
+           });
+
+       c->mc->check_destroy(c->mc);
+
+       mrp_transport_disconnect(t);
+
+       c->addr    = a;
+       c->addrlen = l;
+
+       if (l)
+           mrp_transport_connect(t, &a, l);
+    }
+    else
+       mrp_log_error("Received malformed console message.");
 }
 
 
@@ -172,8 +187,7 @@ static void closed_evt(mrp_transport_t *t, int error, void *user_data)
 }
 
 
-static void accept_cb(mrp_mainloop_t *ml, mrp_io_watch_t *iow, int fd,
-                     mrp_io_event_t events, void *user_data)
+void connection_evt(mrp_transport_t *lt, void *user_data)
 {
     static mrp_transport_evt_t evt = {
        .recv     = recv_evt,
@@ -183,92 +197,163 @@ static void accept_cb(mrp_mainloop_t *ml, mrp_io_watch_t *iow, int fd,
     
     static mrp_console_req_t req = {
        .write      = write_req,
-       .close      = close_req,
+       .close      = tcp_close_req,
        .free       = free_req,
        .set_prompt = set_prompt_req,
     };
-    
-    data_t    *data = (data_t *)user_data;
+
+
+    data_t    *data  = (data_t *)user_data;
+    int        flags;
     console_t *c;
 
-    MRP_UNUSED(iow);
+    if ((c = mrp_allocz(sizeof(*c))) != NULL) {
+       flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
+       c->t  = mrp_transport_accept(lt, &evt, c, flags);
+       
+       if (c->t != NULL) {
+           c->mc = mrp_create_console(data->ctx, &req, c);
 
-    c = NULL;
+           if (c->mc != NULL)
+               return;
+       }
 
-    if (events & MRP_IO_EVENT_IN) {
-       if ((c = mrp_allocz(sizeof(*c))) != NULL) {
-           c->t = mrp_transport_accept(ml, "tcp", &fd, &evt, c);
-
-           if (c->t != NULL) {
-               c->mc = mrp_create_console(data->ctx, &req, c);
-               
-               if (c->mc != NULL) {
-                   console_info("accepted new console connection.");
-                   return;
+       mrp_transport_destroy(c->t);
+       mrp_free(c);
+    }
+}
+
+
+enum {
+    ARG_ADDRESS                          /* console address, 'address:port' */
+};
+
+
+static int tcp_setup(data_t *data)
+{
+    static mrp_transport_evt_t evt = {
+       .closed     = NULL,
+       .recv       = NULL,
+       .recvfrom   = NULL,
+       .connection = connection_evt,
+    };
+
+    mrp_transport_t *t;
+    struct sockaddr  addr;
+    socklen_t        addrlen;
+    int              flags;
+    
+    t       = NULL;
+    addrlen = mrp_transport_resolve(NULL, data->address, &addr, sizeof(addr));
+
+    if (addrlen > 0) {
+       flags = MRP_TRANSPORT_REUSEADDR;
+       t     = mrp_transport_create(data->ctx->ml, "tcp", &evt, data, flags);
+       
+       if (t != NULL) {
+           if (mrp_transport_bind(t, &addr, addrlen)) {
+               if (mrp_transport_listen(t, 4)) {
+                   data->t = t;
+                   return TRUE;
                }
                else
-                   console_error("failed to create new console.");
+                   console_error("Failed to listen on server transport.");
            }
            else
-               console_error("failed to accept console connection.");
+               console_error("Failed to bind to address %s.", data->address);
        }
        else
-           console_error("failed to allocate new console.");
+           console_error("Failed to create main console transport.");
+    }
+    else
+       console_error("Invalid console address '%s'.", data->address);
+    
+    mrp_transport_destroy(t);
+    
+    return FALSE;
+}
+
+
+static int udp_setup(data_t *data)
+{
+    static mrp_transport_evt_t evt = {
+       .recv     = recv_evt,
+       .recvfrom = recvfrom_evt,
+       .closed   = NULL,
+    };
+
+    static mrp_console_req_t req = {
+       .write      = write_req,
+       .close      = udp_close_req,
+       .free       = free_req,
+       .set_prompt = set_prompt_req,
+    };
+
+    console_t       *c;
+    mrp_transport_t *t;
+    struct sockaddr  addr;
+    socklen_t        addrlen;
+    int              f;
+    
+    t       = NULL;
+    addrlen = mrp_transport_resolve(NULL, data->address, &addr, sizeof(addr));
+
+    if (addrlen > 0) {
+       if ((c = mrp_allocz(sizeof(*c))) != NULL) {
+           f = MRP_TRANSPORT_REUSEADDR;
+           t = mrp_transport_create(data->ctx->ml, "udp", &evt, c, f);
 
-       if (c != NULL) {
-           if (c->t != NULL)
-               mrp_transport_destroy(c->t);
+           if (t != NULL) {
+               if (mrp_transport_bind(t, &addr, addrlen)) {
+                   c->t  = t;
+                   c->mc = mrp_create_console(data->ctx, &req, c);
+       
+                   if (c->mc != NULL){
+                       data->c         = c;
+                       c->mc->preserve = TRUE;
+                       return TRUE;
+                   }
+               }
+
+               mrp_transport_destroy(t);
+           }
            
            mrp_free(c);
        }
     }
-}
-
+    else
+       console_error("Invalid console address '%s'.", data->address);
 
-enum {
-    ARG_ADDRESS                          /* console address, 'address:port' */
-};
+    return FALSE;
+}
 
 
 static int console_init(mrp_plugin_t *plugin)
 {
     data_t *data;
-    mrp_mainloop_t *ml;
-    mrp_io_event_t  events;
+    int     ok;
 
     if ((data = mrp_allocz(sizeof(*data))) != NULL) {
        mrp_list_init(&data->consoles);
 
        data->ctx     = plugin->ctx;
        data->address = plugin->args[ARG_ADDRESS].str;
-       data->sock    = console_listen(data->address);
-
-       if (data->sock < 0)
-           goto fail;
-
-       ml        = data->ctx->ml;
-       events    = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP | MRP_IO_EVENT_ERR;
-       data->iow = mrp_add_io_watch(ml, data->sock, events, accept_cb, data);
        
-       if (data->iow == NULL) {
-           console_error("failed to set up console I/O watch.");
-           goto fail;
-       }
-
-       plugin->data = data;
-       console_info("set up at address '%s'.", data->address);
+       if (!strncmp(data->address, "tcp:", 4))
+           ok = tcp_setup(data);
+       else
+           ok = udp_setup(data);
 
-       return TRUE;
+       if (ok) {
+           plugin->data = data;
+           console_info("set up at address '%s'.", data->address);
+           
+           return TRUE;
+       }
     }
     
- fail:
-    if (data != NULL) {
-       if (data->sock >= 0)
-           close(data->sock);
-       
-       mrp_free(data);
-    }
-
+    mrp_free(data);
+    
     console_error("failed to set up console at address '%s'.",
                  plugin->args[ARG_ADDRESS].str);