#define client_error mrp_log_error
#define DEFAULT_PROMPT "murphy> "
-#define DEFAULT_CONSOLE "tcp:127.0.0.1:3000"
+#define DEFAULT_ADDRESS "tcp:127.0.0.1:3000"
static void input_process_cb(char *input);
void recv_evt(mrp_transport_t *t, mrp_msg_t *msg, void *user_data)
{
client_t *c = (client_t *)user_data;
- char *prompt, *output, buf[128];
+ char *prompt, *output, buf[128], *dummy;
size_t size;
MRP_UNUSED(t);
snprintf(buf, sizeof(buf), "%.*s> ", (int)size, prompt);
prompt_set(c, buf);
}
+ else if ((dummy = mrp_msg_find(msg, "bye", &size)) != NULL) {
+ mrp_mainloop_quit(c->ml, 0);
+ return;
+ }
-#if 0 /* done by the transport layer... */
- mrp_msg_destroy(msg);
-#endif
-
prompt_display(c);
}
}
-int client_setup(client_t *c, char *addr)
+int client_setup(client_t *c, const char *addrstr)
{
static mrp_transport_evt_t evt = {
.closed = closed_evt,
.recvfrom = NULL,
};
- c->t = mrp_transport_create(c->ml, "tcp", &evt, c);
+ struct sockaddr addr;
+ socklen_t addrlen;
+ const char *type;
+
+ if (!strncmp(addrstr, "udp:", 4))
+ type = "udp";
+ else
+ type = "tcp";
+
+ addrlen = mrp_transport_resolve(NULL, addrstr, &addr, sizeof(addr));
- if (c->t == NULL) {
- mrp_log_error("Failed to create new transport.");
- return FALSE;
- }
+ if (addrlen > 0) {
+ c->t = mrp_transport_create(c->ml, type, &evt, c, 0);
+
+ if (c->t == NULL) {
+ mrp_log_error("Failed to create new transport.");
+ return FALSE;
+ }
- if (!mrp_transport_connect(c->t, addr)) {
- mrp_log_error("Failed to connect to %s.", addr);
- mrp_transport_destroy(c->t);
- return FALSE;
- }
+ if (!mrp_transport_connect(c->t, &addr, addrlen)) {
+ mrp_log_error("Failed to connect to %s.", addrstr);
+ mrp_transport_destroy(c->t);
+ return FALSE;
+ }
- return TRUE;
+ return TRUE;
+ }
+ else
+ mrp_log_error("Failed to resolve address '%s'.", addrstr);
+
+ return FALSE;
}
int main(int argc, char *argv[])
{
- client_t c;
+ client_t c;
+ const char *addr;
MRP_UNUSED(argc);
MRP_UNUSED(argv);
mrp_add_sighandler(c.ml, SIGINT, signal_handler, &c);
- if (!input_setup(&c) || !client_setup(&c, DEFAULT_CONSOLE))
+ if (argc == 2)
+ addr = argv[1];
+ else
+ addr = DEFAULT_ADDRESS;
+
+ if (!input_setup(&c) || !client_setup(&c, addr))
exit(1);
rl_client = &c; /* readline has not user_data */
#define MRP_CFG_MAXLINE 4096 /* input line length limit */
+typedef struct console_s console_t;
+
/*
* console plugin data
*/
typedef struct {
const char *address; /* console address */
+ mrp_transport_t *t; /* transport we're listening on */
int sock; /* main socket for new connections */
mrp_io_watch_t *iow; /* main socket I/O watch */
mrp_context_t *ctx; /* murphy context */
mrp_list_hook_t consoles; /* active consoles */
+ struct sockaddr addr;
+ socklen_t addrlen;
+ console_t *c;
} data_t;
* a console instance
*/
-typedef struct {
+struct console_s {
mrp_console_t *mc; /* associated murphy console */
mrp_transport_t *t; /* associated transport */
-} console_t;
-
-
-static int console_listen(const char *address)
-{
- struct sockaddr addr;
- socklen_t addrlen;
- int sock, on;
-
- addrlen = mrp_transport_resolve(NULL, address, &addr, sizeof(addr));
-
- if (!addrlen) {
- console_error("invalid console address '%s'.", address);
- return FALSE;
- }
-
- if ((sock = socket(addr.sa_family, SOCK_STREAM, 0)) < 0) {
- console_error("failed to create console socket (%d: %s).",
- errno, strerror(errno));
- goto fail;
- }
-
- on = 1;
- setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
-
- if (bind(sock, &addr, addrlen) != 0) {
- console_error("failed to bind to address '%s' (%d: %s).",
- address, errno, strerror(errno));
- goto fail;
- }
-
- if (listen(sock, 4) < 0) {
- console_error("failed to listen for connections (%d: %s).",
- errno, strerror(errno));
- goto fail;
- }
-
- return sock;
+ struct sockaddr addr;
+ socklen_t addrlen;
+};
- fail:
- if (sock >= 0)
- close(sock);
-
- return FALSE;
-}
static ssize_t write_req(mrp_console_t *mc, void *buf, size_t size)
}
-static void close_req(mrp_console_t *mc)
+static void tcp_close_req(mrp_console_t *mc)
{
console_t *c = (console_t *)mc->backend_data;
}
+static void udp_close_req(mrp_console_t *mc)
+{
+ console_t *c = (console_t *)mc->backend_data;
+ mrp_msg_t *msg;
+ int dummy = TRUE;
+
+ if ((msg = mrp_msg_create("bye", &dummy, sizeof(dummy), NULL)) != NULL)
+ mrp_transport_send(c->t, msg);
+
+ mrp_transport_disconnect(c->t);
+}
+
+
static void set_prompt_req(mrp_console_t *mc, const char *prompt)
{
console_t *c = (console_t *)mc->backend_data;
}
else
mrp_log_error("Received malformed console message.");
+}
+
+
+static void recvfrom_evt(mrp_transport_t *t, mrp_msg_t *msg,
+ void *addrptr, socklen_t addrlen, void *user_data)
+{
+ console_t *c = (console_t *)user_data;
+ struct sockaddr *addr = (struct sockaddr *)addrptr;
+ char *input;
+ size_t size;
+
+ MRP_UNUSED(t);
-#if 0 /* done by the transport layer... */
- mrp_msg_unref(msg); /* XXX TODO change to refcounting */
-#endif
+ mrp_debug("got new message...");
+
+ input = mrp_msg_find(msg, "input", &size);
+
+ if (input != NULL) {
+ struct sockaddr a = c->addr;
+ socklen_t l = c->addrlen;
+
+ c->addr = *addr;
+ c->addrlen = addrlen;
+
+ mrp_transport_connect(t, addr, addrlen);
+ MRP_CONSOLE_BUSY(c->mc, {
+ c->mc->evt.input(c->mc, input, size);
+ });
+
+ c->mc->check_destroy(c->mc);
+
+ mrp_transport_disconnect(t);
+
+ c->addr = a;
+ c->addrlen = l;
+
+ if (l)
+ mrp_transport_connect(t, &a, l);
+ }
+ else
+ mrp_log_error("Received malformed console message.");
}
}
-static void accept_cb(mrp_mainloop_t *ml, mrp_io_watch_t *iow, int fd,
- mrp_io_event_t events, void *user_data)
+void connection_evt(mrp_transport_t *lt, void *user_data)
{
static mrp_transport_evt_t evt = {
.recv = recv_evt,
static mrp_console_req_t req = {
.write = write_req,
- .close = close_req,
+ .close = tcp_close_req,
.free = free_req,
.set_prompt = set_prompt_req,
};
-
- data_t *data = (data_t *)user_data;
+
+
+ data_t *data = (data_t *)user_data;
+ int flags;
console_t *c;
- MRP_UNUSED(iow);
+ if ((c = mrp_allocz(sizeof(*c))) != NULL) {
+ flags = MRP_TRANSPORT_REUSEADDR | MRP_TRANSPORT_NONBLOCK;
+ c->t = mrp_transport_accept(lt, &evt, c, flags);
+
+ if (c->t != NULL) {
+ c->mc = mrp_create_console(data->ctx, &req, c);
- c = NULL;
+ if (c->mc != NULL)
+ return;
+ }
- if (events & MRP_IO_EVENT_IN) {
- if ((c = mrp_allocz(sizeof(*c))) != NULL) {
- c->t = mrp_transport_accept(ml, "tcp", &fd, &evt, c);
-
- if (c->t != NULL) {
- c->mc = mrp_create_console(data->ctx, &req, c);
-
- if (c->mc != NULL) {
- console_info("accepted new console connection.");
- return;
+ mrp_transport_destroy(c->t);
+ mrp_free(c);
+ }
+}
+
+
+enum {
+ ARG_ADDRESS /* console address, 'address:port' */
+};
+
+
+static int tcp_setup(data_t *data)
+{
+ static mrp_transport_evt_t evt = {
+ .closed = NULL,
+ .recv = NULL,
+ .recvfrom = NULL,
+ .connection = connection_evt,
+ };
+
+ mrp_transport_t *t;
+ struct sockaddr addr;
+ socklen_t addrlen;
+ int flags;
+
+ t = NULL;
+ addrlen = mrp_transport_resolve(NULL, data->address, &addr, sizeof(addr));
+
+ if (addrlen > 0) {
+ flags = MRP_TRANSPORT_REUSEADDR;
+ t = mrp_transport_create(data->ctx->ml, "tcp", &evt, data, flags);
+
+ if (t != NULL) {
+ if (mrp_transport_bind(t, &addr, addrlen)) {
+ if (mrp_transport_listen(t, 4)) {
+ data->t = t;
+ return TRUE;
}
else
- console_error("failed to create new console.");
+ console_error("Failed to listen on server transport.");
}
else
- console_error("failed to accept console connection.");
+ console_error("Failed to bind to address %s.", data->address);
}
else
- console_error("failed to allocate new console.");
+ console_error("Failed to create main console transport.");
+ }
+ else
+ console_error("Invalid console address '%s'.", data->address);
+
+ mrp_transport_destroy(t);
+
+ return FALSE;
+}
+
+
+static int udp_setup(data_t *data)
+{
+ static mrp_transport_evt_t evt = {
+ .recv = recv_evt,
+ .recvfrom = recvfrom_evt,
+ .closed = NULL,
+ };
+
+ static mrp_console_req_t req = {
+ .write = write_req,
+ .close = udp_close_req,
+ .free = free_req,
+ .set_prompt = set_prompt_req,
+ };
+
+ console_t *c;
+ mrp_transport_t *t;
+ struct sockaddr addr;
+ socklen_t addrlen;
+ int f;
+
+ t = NULL;
+ addrlen = mrp_transport_resolve(NULL, data->address, &addr, sizeof(addr));
+
+ if (addrlen > 0) {
+ if ((c = mrp_allocz(sizeof(*c))) != NULL) {
+ f = MRP_TRANSPORT_REUSEADDR;
+ t = mrp_transport_create(data->ctx->ml, "udp", &evt, c, f);
- if (c != NULL) {
- if (c->t != NULL)
- mrp_transport_destroy(c->t);
+ if (t != NULL) {
+ if (mrp_transport_bind(t, &addr, addrlen)) {
+ c->t = t;
+ c->mc = mrp_create_console(data->ctx, &req, c);
+
+ if (c->mc != NULL){
+ data->c = c;
+ c->mc->preserve = TRUE;
+ return TRUE;
+ }
+ }
+
+ mrp_transport_destroy(t);
+ }
mrp_free(c);
}
}
-}
-
+ else
+ console_error("Invalid console address '%s'.", data->address);
-enum {
- ARG_ADDRESS /* console address, 'address:port' */
-};
+ return FALSE;
+}
static int console_init(mrp_plugin_t *plugin)
{
data_t *data;
- mrp_mainloop_t *ml;
- mrp_io_event_t events;
+ int ok;
if ((data = mrp_allocz(sizeof(*data))) != NULL) {
mrp_list_init(&data->consoles);
data->ctx = plugin->ctx;
data->address = plugin->args[ARG_ADDRESS].str;
- data->sock = console_listen(data->address);
-
- if (data->sock < 0)
- goto fail;
-
- ml = data->ctx->ml;
- events = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP | MRP_IO_EVENT_ERR;
- data->iow = mrp_add_io_watch(ml, data->sock, events, accept_cb, data);
- if (data->iow == NULL) {
- console_error("failed to set up console I/O watch.");
- goto fail;
- }
-
- plugin->data = data;
- console_info("set up at address '%s'.", data->address);
+ if (!strncmp(data->address, "tcp:", 4))
+ ok = tcp_setup(data);
+ else
+ ok = udp_setup(data);
- return TRUE;
+ if (ok) {
+ plugin->data = data;
+ console_info("set up at address '%s'.", data->address);
+
+ return TRUE;
+ }
}
- fail:
- if (data != NULL) {
- if (data->sock >= 0)
- close(data->sock);
-
- mrp_free(data);
- }
-
+ mrp_free(data);
+
console_error("failed to set up console at address '%s'.",
plugin->args[ARG_ADDRESS].str);