From: Krisztian Litkey Date: Tue, 15 Jan 2013 12:20:07 +0000 (+0200) Subject: common: low-level websocket abstraction + a websocket transport X-Git-Tag: accepted/2.0alpha/20130219.205908~82^2~34 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=f0c19f56010c38b8c48cd7db107effbdac698dc3;p=profile%2Fivi%2Fmurphy.git common: low-level websocket abstraction + a websocket transport Our low-level websocket library is built on top of libwebsockets. It basically provides the necessary bits of glue to pump websocket connections from a Murphy mainloop. Additionally it attempts to do most of the heavy lifting necessary to associate extra contextual data with websockets (IOW attaching/detaching in a safe manner user data with websocket contexts and connections) which I found surprisingly hairy to do with libwebsockets itself. Eventually the low-level library should map out all the useful features of the underlying library. We're far not there at the moment. We do not provide an interface for features that we do not need ourselves at the moment. These include things like connection filtering, pure HTTP connections, serving files to clients, extension- negotiation, etc. Of these at least SSL-support, pure HTTP clients and content-serving (for serving our own javascript libraries to clients) are next on the list to map out. The websocket transport basically integrates websockets to our transport abstraction. IOW, it makes it possible to use a websocket connection as the underlying IPC mechanism for a Murphy transport. As a side-effect of implementing websocket transports, an automagic defragmentation mechanism (fragbuf or mrp_fragbuf_t) has also been added to the common murphy library. --- diff --git a/configure.ac b/configure.ac index 905b8bb..1beed77 100644 --- a/configure.ac +++ b/configure.ac @@ -281,6 +281,31 @@ AC_SUBST(CONSOLE_ENABLED) AC_SUBST(READLINE_CFLAGS) AC_SUBST(READLINE_LIBS) +# Check if websockets support was enabled. +AC_ARG_ENABLE(websockets, + [--enable-websockets enable websockets support], + [enable_websockets=$enableval], [enable_websockets=auto]) + +if test "$enable_websockets" != "no"; then + PKG_CHECK_MODULES(WEBSOCKETS, libwebsockets, + [have_websockets=yes], [have_websockets=no]) + if test "$have_websockets" = "no" -a "$enable_websockets" = "yes"; then + AC_MSG_ERROR([libwebsockets development libraries not found.]) + fi + + enable_websockets="$have_websockets" +else + AC_MSG_NOTICE([libwebsockets support is disabled.]) +fi + +if test "$enable_websockets" = "yes"; then + AC_DEFINE([WEBSOCKETS_ENABLED], 1, [Enable websockets support ?]) +fi +AM_CONDITIONAL(WEBSOCKETS_ENABLED, [test "$enable_websockets" = "yes"]) +AC_SUBST(WEBSOCKETS_ENABLED) +AC_SUBST(WEBSOCKETS_CFLAGS) +AC_SUBST(WEBSOCKETS_LIBS) + # Set up murphy CFLAGS and LIBS. MURPHY_CFLAGS="" MURPHY_LIBS="" diff --git a/src/Makefile.am b/src/Makefile.am index 462382f..1589d30 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -89,6 +89,26 @@ libmurphy_common_la_DEPENDENCIES = linker-script.common libcommonincludedir = $(includedir)/murphy/common libcommoninclude_HEADERS = $(libmurphy_common_la_HEADERS) +if WEBSOCKETS_ENABLED +libmurphy_common_la_HEADERS += \ + common/fragbuf.h \ + common/websocklib.h \ + common/websocket.c + +libmurphy_common_la_REGULAR_SOURCES += \ + common/fragbuf.c \ + common/websocklib.c \ + common/websocket.c \ + common/wsck-transport.c + +libmurphy_common_la_CFLAGS += \ + $(WEBSOCKETS_CFLAGS) + +libmurphy_common_la_LIBADD += \ + $(WEBSOCKETS_LIBS) +endif + + # linker script generation linker-script.common: $(libmurphy_common_la_HEADERS) $(QUIET_GEN)$(top_builddir)/build-aux/gen-linker-script -q -o $@ $^ diff --git a/src/common/fragbuf.c b/src/common/fragbuf.c new file mode 100644 index 0000000..df06c76 --- /dev/null +++ b/src/common/fragbuf.c @@ -0,0 +1,218 @@ +#include + +#include +#include +#include +#include + +struct mrp_fragbuf_s { + void *data; /* actual data buffer */ + int size; /* size of the buffer */ + int used; /* amount of data in the bufer */ + int framed : 1; /* whether data is framed */ +}; + + +static void *fragbuf_ensure(mrp_fragbuf_t *buf, size_t size) +{ + int more; + + if (buf->size - buf->used < (int)size) { + more = size - (buf->size - buf->used); + + if (mrp_reallocz(buf->data, buf->size, buf->size + more) == NULL) + return NULL; + else + buf->size += more; + } + + return buf->data + buf->used; +} + + +static size_t fragbuf_missing(mrp_fragbuf_t *buf) +{ + void *ptr; + int offs; + uint32_t size; + + if (!buf->framed || !buf->used) + return -1; + + /* find the last frame */ + ptr = buf->data; + offs = 0; + while (offs < buf->used) { + size = be32toh(*(uint32_t *)ptr); + offs += sizeof(size) + size; + } + + /* get the amount of data missing */ + return offs - buf->used; +} + + +int fragbuf_init(mrp_fragbuf_t *buf, int framed, int pre_alloc) +{ + buf->data = NULL; + buf->size = 0; + buf->used = 0; + buf->framed = framed; + + if (pre_alloc <= 0 || fragbuf_ensure(buf, pre_alloc)) + return TRUE; + else + return FALSE; +} + + +mrp_fragbuf_t *mrp_fragbuf_create(int framed, size_t pre_alloc) +{ + mrp_fragbuf_t *buf; + + buf = mrp_allocz(sizeof(*buf)); + + if (buf != NULL) { + if (fragbuf_init(buf, framed, pre_alloc)) + return buf; + + mrp_free(buf); + } + + return NULL; +} + + +void mrp_fragbuf_reset(mrp_fragbuf_t *buf) +{ + if (buf != NULL) { + mrp_free(buf->data); + buf->data = NULL; + buf->size = 0; + buf->used = 0; + } +} + +void mrp_fragbuf_destroy(mrp_fragbuf_t *buf) +{ + if (buf != NULL) { + mrp_free(buf->data); + mrp_free(buf); + } +} + + +void *mrp_fragbuf_alloc(mrp_fragbuf_t *buf, size_t size) +{ + void *ptr; + + ptr = fragbuf_ensure(buf, size); + + if (ptr != NULL) + buf->used += size; + + return ptr; +} + + +int mrp_fragbuf_push(mrp_fragbuf_t *buf, void *data, size_t size) +{ + void *ptr; + + ptr = fragbuf_ensure(buf, size); + + if (ptr != NULL) { + memcpy(ptr, data, size); + buf->used += size; + + return TRUE; + } + else + return FALSE; +} + + +int mrp_fragbuf_pull(mrp_fragbuf_t *buf, void **datap, size_t *sizep) +{ + void *data; + uint32_t size; + + if (buf == NULL || buf->used <= 0) + return FALSE; + + /* start of iteration */ + if (*datap == NULL) { + if (!buf->framed) { + *datap = buf->data; + *sizep = buf->used; + + return TRUE; + } + else { + if (buf->used < (int)sizeof(size)) + return FALSE; + + size = be32toh(*(uint32_t *)buf->data); + + if (buf->used >= (int)(sizeof(size) + size)) { + *datap = buf->data + sizeof(size); + *sizep = size; + + return TRUE; + } + else + return FALSE; + } + } + /* continue iteration */ + else { + if (!buf->framed) { + data = *datap + *sizep; + + if (buf->data <= data && data < buf->data + buf->used) { + memmove(buf->data, data, buf->used - (data - buf->data)); + buf->used -= (data - buf->data); + + *datap = buf->data; + *sizep = buf->used; + + return TRUE; + } + else { + if (data == buf->data + buf->used) + buf->used = 0; + + return FALSE; + } + } + else { + if (*datap != buf->data + sizeof(size)) + return FALSE; + + size = be32toh(*(uint32_t *)buf->data); + + if ((int)(size + sizeof(size)) <= buf->used) { + memmove(buf->data, buf->data + size + sizeof(size), + buf->used - (size + sizeof(size))); + buf->used -= size + sizeof(size); + } + else + return FALSE; + + if (buf->used <= (int)sizeof(size)) + return FALSE; + + size = be32toh(*(uint32_t *)buf->data); + data = buf->data + sizeof(size); + + if (buf->used >= (int)(size + sizeof(size))) { + *datap = data; + *sizep = size; + + return TRUE; + } + + return FALSE; + } + } +} diff --git a/src/common/fragbuf.h b/src/common/fragbuf.h new file mode 100644 index 0000000..8c62070 --- /dev/null +++ b/src/common/fragbuf.h @@ -0,0 +1,57 @@ +#ifndef __MURPHY_FRAGBUF_H__ +#define __MURPHY_FRAGBUF_H__ + +#include + +MRP_CDECL_BEGIN + +/* + * Fragment collector buffers. + * + * As the name implies, a fragment collector buffer can be used + * to collect message fragments and reassemble messages that were + * transmitted in arbitrary pieces. + * + * Messages are expected to be transmitted in frames where each + * frame simply consist of a 32-bit message size followed by + * the actual message data. On the sending side you can simply + * send each message prefixed with its size. On the receiving side + * you keep feeding the received chunks of data to a fragment + * collector buffer (using mrp_fragbuf_push). After each chunk you + * can iterate through the fully reassembled messages (by calling + * mrp_fragbuf_pull until it returns FALSE). Messages are removed + * automatically from the collector buffer as you iterate through + * them. + * + * You can also create a collector buffer in frameless mode. Such a + * buffer will always return immediately all available data as you + * iterate through it. + */ + +/** Buffer for collecting fragments of (framed or unframed) message data. */ +typedef struct mrp_fragbuf_s mrp_fragbuf_t; + +/** Initialize the given fragment collector buffer. */ +mrp_fragbuf_t *mrp_fragbuf_create(int framed, size_t pre_alloc); + +/** Initialize the given data collector buffer. */ +int mrp_fragbuf_init(mrp_fragbuf_t *buf, int framed, size_t pre_alloc); + +/** Reset the given data collector buffer. */ +void mrp_fragbuf_reset(mrp_fragbuf_t *buf); + +/** Destroy the given data collector buffer, freeing all associated memory. */ +void mrp_fragbuf_destroy(mrp_fragbuf_t *buf); + +/** Allocate a buffer of the given size from the buffer. */ +void *mrp_fragbuf_alloc(mrp_fragbuf_t *buf, size_t size); + +/** Append the given data to the buffer. */ +int mrp_fragbuf_push(mrp_fragbuf_t *buf, void *data, size_t size); + +/** Iterate through the given buffer, pulling and freeing assembled messages. */ +int mrp_fragbuf_pull(mrp_fragbuf_t *buf, void **data, size_t *size); + +MRP_CDECL_END + +#endif /* __MURPHY_FRAGBUF_H__ */ diff --git a/src/common/tests/Makefile.am b/src/common/tests/Makefile.am index 1aa0036..b2539bd 100644 --- a/src/common/tests/Makefile.am +++ b/src/common/tests/Makefile.am @@ -5,6 +5,8 @@ if DBUS_ENABLED noinst_PROGRAMS += mainloop-test dbus-test endif +noinst_PROGRAMS += fragbuf-test + # memory management test mm_test_SOURCES = mm-test.c mm_test_CFLAGS = $(AM_CFLAGS) @@ -66,3 +68,13 @@ dbus_test_SOURCES = dbus-test.c dbus_test_CFLAGS = $(AM_CFLAGS) $(DBUS_CFLAGS) dbus_test_LDADD = ../../libmurphy-dbus.la ../../libmurphy-common.la endif + +## databuf test +#databuf_test_SOURCES = databuf-test.c +#databuf_test_CFLAGS = $(AM_CFLAGS) +#databuf_test_LDADD = ../../libmurphy-common.la + +# fragbuf test +fragbuf_test_SOURCES = fragbuf-test.c +fragbuf_test_CFLAGS = $(AM_CFLAGS) +fragbuf_test_LDADD = ../../libmurphy-common.la \ No newline at end of file diff --git a/src/common/tests/fragbuf-test.c b/src/common/tests/fragbuf-test.c new file mode 100644 index 0000000..64ed2bb --- /dev/null +++ b/src/common/tests/fragbuf-test.c @@ -0,0 +1,355 @@ +#include +#include + +#include +#include +#include + + +#define fatal(fmt, args...) do { \ + mrp_log_error(fmt, ## args); \ + exit(1); \ + } while (0) + + +typedef struct { + int log_mask; + const char *log_target; + int framed; +} context_t; + +context_t ctx; + +void check_message(void *data, size_t size, char **messages, + int *chk, int *offs) +{ + char *p, *d; + int l; + + if (ctx.framed) { + if (!strncmp(messages[*chk], data, size) && !messages[*chk][size]) + mrp_debug("message check: OK"); + else + fatal("message check: failed"); + + *chk += 1; + } + else { + d = data; + while (size > 0) { + p = messages[*chk] + *offs; + l = strlen(p); + + if (l > (int)size) + l = (int)size; + + if (strncmp(p, d, l)) + fatal("message check: failed"); + + *offs += l; + size -= l; + d += l; + + if (messages[*chk][*offs] == '\0') { + *chk += 1; + *offs = 0; + } + } + mrp_debug("message check: OK"); + } +} + + +void dump_buffer(mrp_fragbuf_t *buf, char **messages, int *chk, int *offs) +{ + void *data; + size_t size; + int cnt; + + data = NULL; + size = 0; + cnt = 0; + + while (mrp_fragbuf_pull(buf, &data, &size)) { + mrp_log_info("got message: (%zd bytes) [%*.*s]", size, + (int)size, (int)size, (char *)data); + + check_message(data, size, messages, chk, offs); + + cnt++; + } + + if (!cnt) + mrp_debug("no full messages in buffer"); + else + mrp_debug("pulled %d messages from buffer...", cnt); +} + + +int test(mrp_fragbuf_t *buf, size_t *chunks, int dump_interval) +{ + char *messages[] = { + "Ticking away the moments", + "That make up a dull day", + "Fritter and waste the hours", + "In an off-hand way", + "Kicking around on a piece of ground", + "In your home town", + "Waiting for someone or something", + "To show you the way", + "Tired of lying in the sunshine", + "Staying home to watch the rain", + "You are young and life is long", + "And there is time to kill today", + "And then the one day you find", + "Ten years have got behind you", + "No one told you when to run", + "You missed the starting gun", + "And you run and you run", + "To catch up with the sun", + "But it's sinking", + "Racing around", + "To come up behind you again", + "The sun is the same", + "In a relative way", + "But you're older", + "Shorter of breath", + "And one day closer to death", + "Every year is getting shorter", + "Never seem to find the time", + "Plans that either come to naught", + "Or half a page of scribbled lines", + "Hanging on in quiet desperation", + "Is the English way", + "The time is gone", + "The song is over", + "Thought I'd something more to say", + "Home", + "Home again", + "I like to be here", + "When I can", + "When I come home", + "Cold and tired", + "It's good to warm my bones", + "Beside the fire", + "Far away", + "Across the field", + "Tolling on the iron bell", + "Calls the faithful to their knees", + "To hear the softly spoken magic spell...", + "test #1", + "test #2", + "this is a test #3", + "message #4", + "message #5", + "test message #6", + "a test #7", + "the quick brown (#8)", + "fox (#9)", + "jumps over the (#10)", + "lazy dog (#11)", + "this is another test message (#12)", + "and here is one more for you (#13)", + "foo (#14)", + "bar (#15)", + "foobar (#16)", + "barfoo (#17)", + "xyzzykukkuluuruu (#18)" + }; + + char *msg, *p; + uint32_t size, nbo_size; + size_t n, total; + int dump, chk, offs, i, j; + + dump = chk = offs = 0; + + for (i = 0; i < (int)MRP_ARRAY_SIZE(messages); i++) { + msg = messages[i]; + size = strlen(msg); + + total = 0; + p = msg; + + if (ctx.framed) { + nbo_size = htobe32(size); + if (!mrp_fragbuf_push(buf, &nbo_size, sizeof(nbo_size))) + fatal("failed to push message size to buffer"); + } + + for (j = 0; *p != '\0'; j++) { + if (!chunks[j]) + j = 0; + n = chunks[j]; + if (n > strlen(p)) + n = strlen(p); + + mrp_debug("pushing %zd bytes (%*.*s)...", n, (int)n, (int)n, p); + + if (!mrp_fragbuf_push(buf, p, n)) + fatal("failed to push %*.*s to buffer", (int)n, (int)n, p); + + p += n; + total += n; + + dump++; + + if (!dump_interval || + (dump_interval > 0 && !(dump % dump_interval))) + dump_buffer(buf, messages, &chk, &offs); + } + + if (dump_interval < -1) { + if (i && !(i % -dump_interval)) + dump_buffer(buf, messages, &chk, &offs); + } + } + + dump_buffer(buf, messages, &chk, &offs); + + return TRUE; +} + + +static void print_usage(const char *argv0, int exit_code, const char *fmt, ...) +{ + va_list ap; + + if (fmt && *fmt) { + va_start(ap, fmt); + vprintf(fmt, ap); + printf("\n"); + va_end(ap); + } + + printf("usage: %s [options]\n\n" + "The possible options are:\n" + " -t, --log-target=TARGET log target to use\n" + " TARGET is one of stderr,stdout,syslog, or a logfile path\n" + " -l, --log-level=LEVELS logging level to use\n" + " LEVELS is a comma separated list of info, error and warning\n" + " -v, --verbose increase logging verbosity\n" + " -d, --debug enable debug messages\n" + " -n, --non-framed set buffer to non-framed mode\n" + " -h, --help show help on usage\n", + argv0); + + if (exit_code < 0) + return; + else + exit(exit_code); +} + + +static void config_set_defaults(void) +{ + mrp_clear(&ctx); + ctx.log_mask = MRP_LOG_UPTO(MRP_LOG_INFO); + ctx.log_target = MRP_LOG_TO_STDOUT; + ctx.framed = TRUE; +} + + +void parse_cmdline(int argc, char **argv) +{ +# define OPTIONS "l:t:vd:nh" + struct option options[] = { + { "log-level" , required_argument, NULL, 'l' }, + { "log-target", required_argument, NULL, 't' }, + { "verbose" , optional_argument, NULL, 'v' }, + { "debug" , required_argument, NULL, 'd' }, + { "non-framed", no_argument , NULL, 'n' }, + { "help" , no_argument , NULL, 'h' }, + { NULL, 0, NULL, 0 } + }; + + int opt; + + config_set_defaults(); + + while ((opt = getopt_long(argc, argv, OPTIONS, options, NULL)) != -1) { + switch (opt) { + case 'v': + ctx.log_mask <<= 1; + ctx.log_mask |= 1; + break; + + case 'l': + ctx.log_mask = mrp_log_parse_levels(optarg); + if (ctx.log_mask < 0) + print_usage(argv[0], EINVAL, "invalid log level '%s'", optarg); + break; + + case 't': + ctx.log_target = mrp_log_parse_target(optarg); + if (!ctx.log_target) + print_usage(argv[0], EINVAL, "invalid log target '%s'", optarg); + break; + + case 'd': + ctx.log_mask |= MRP_LOG_MASK_DEBUG; + mrp_debug_set_config(optarg); + mrp_debug_enable(TRUE); + break; + + case'n': + ctx.framed = FALSE; + break; + + case 'h': + print_usage(argv[0], -1, ""); + exit(0); + break; + + case '?': + if (opterr) + print_usage(argv[0], EINVAL, ""); + break; + + default: + print_usage(argv[0], EINVAL, "invalid option '%c'", opt); + } + } +} + +int main(int argc, char *argv[]) +{ + mrp_fragbuf_t *buf; + size_t chunkstbl[][8] = { + { 3, 1, 2, 3, 5, 0, 0, 0 }, + { 1, 2, 3, 4, 3, 2, 1, 0 }, + { 1, 5, 3, 4, 2, 1, 1, 0 }, + { 4, 3, 2, 1, 2, 3, 4, 0 }, + }; + size_t *chunks; + size_t single[] = { 1, 0 }; + int intervals[] = { 1, 2, 3, 4, 5, 0, -1 }; + int i, j, interval; + + parse_cmdline(argc, argv); + + mrp_log_set_mask(ctx.log_mask); + mrp_log_set_target(ctx.log_target); + + buf = mrp_fragbuf_create(ctx.framed, 0); + + if (buf == NULL) + fatal("failed to create data collecting buffer"); + + for (i = 0; i < (int)MRP_ARRAY_SIZE(intervals); i++) { + interval = intervals[i]; + for (j = 0; j < (int)MRP_ARRAY_SIZE(chunkstbl); j++) { + chunks = &chunkstbl[j][0]; + mrp_log_info("testing with interval %d, chunks #%d", interval, j); + test(buf, chunks, interval); + test(buf, single, interval); + mrp_log_info("testing with interval %d, chunks #%d", -i -2, j); + test(buf, chunks, -i - 2); + test(buf, single, -i - 2); + } + } + + mrp_fragbuf_destroy(buf); + + return 0; +} diff --git a/src/common/tests/transport-test.c b/src/common/tests/transport-test.c index 4148029..2ba976e 100644 --- a/src/common/tests/transport-test.c +++ b/src/common/tests/transport-test.c @@ -120,6 +120,14 @@ MRP_DATA_DESCRIPTOR(buggy_descr, TAG_CUSTOM, custom_t, mrp_data_descr_t *data_descr; +typedef enum { + MODE_DEFAULT = 0, + MODE_MESSAGE = 1, + MODE_CUSTOM = 2, + MODE_RAW = 3, +} msg_mode_t; + + typedef struct { mrp_mainloop_t *ml; mrp_transport_t *lt, *t; @@ -131,7 +139,7 @@ typedef struct { int sock; mrp_io_watch_t *iow; mrp_timer_t *timer; - int custom; + int mode; int buggy; int connect; int stream; @@ -149,6 +157,9 @@ void recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data); void recvfrom_custom(mrp_transport_t *t, void *data, uint16_t tag, mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data); +void recvraw(mrp_transport_t *t, void *data, size_t size, void *user_data); +void recvrawfrom(mrp_transport_t *t, void *data, size_t size, + mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data); void dump_msg(mrp_msg_t *msg, FILE *fp) @@ -282,6 +293,49 @@ void recv_custom(mrp_transport_t *t, void *data, uint16_t tag, void *user_data) } +void dump_raw(void *data, size_t size, FILE *fp) +{ + int len = (int)size; + + fprintf(fp, "[%*.*s]\n", len, len, (char *)data); +} + + +void recvfrom_raw(mrp_transport_t *t, void *data, size_t size, + mrp_sockaddr_t *addr, socklen_t addrlen, void *user_data) +{ + context_t *c = (context_t *)user_data; + char rpl[256]; + size_t rpl_size; + int status; + + rpl_size = snprintf(rpl, sizeof(rpl), "reply to message [%*.*s]", + (int)size, (int)size, (char *)data); + + mrp_log_info("received raw message"); + dump_raw(data, size, stdout); + + if (strncmp((char *)data, "reply to ", 9) != 0) { + if (c->connect) + status = mrp_transport_sendraw(t, rpl, rpl_size); + else + status = mrp_transport_sendrawto(t, rpl, rpl_size, addr, addrlen); + + if (status) + mrp_log_info("reply successfully sent"); + else + mrp_log_error("failed to send reply"); + } +} + + +void recv_raw(mrp_transport_t *t, void *data, size_t size, void *user_data) +{ + recvfrom_raw(t, data, size, NULL, 0, user_data); +} + + + void closed_evt(mrp_transport_t *t, int error, void *user_data) { context_t *c = (context_t *)user_data; @@ -345,31 +399,35 @@ void server_init(context_t *c) type_init(c); - if (!c->stream) { - if (c->custom) { - evt.recvdata = recv_custom; - evt.recvdatafrom = recvfrom_custom; - } - else { - evt.recvmsg = recv_msg; - evt.recvmsgfrom = recvfrom_msg; - } + switch (c->mode) { + case MODE_CUSTOM: + evt.recvdata = recv_custom; + evt.recvdatafrom = recvfrom_custom; + break; + case MODE_RAW: + evt.recvraw = recv_raw; + evt.recvrawfrom = recvfrom_raw; + break; + case MODE_MESSAGE: + default: + evt.recvmsg = recv_msg; + evt.recvmsgfrom = recvfrom_msg; } - else { + + if (c->stream) { evt.connection = connection_evt; evt.closed = closed_evt; - if (c->custom) { - evt.recvdata = recv_custom; - evt.recvdatafrom = recvfrom_custom; - } - else { - evt.recvmsg = recv_msg; - evt.recvmsgfrom = recvfrom_msg; - } } - flags = MRP_TRANSPORT_REUSEADDR | - (c->custom ? MRP_TRANSPORT_MODE_CUSTOM : 0); + flags = MRP_TRANSPORT_REUSEADDR; + + switch (c->mode) { + case MODE_CUSTOM: flags |= MRP_TRANSPORT_MODE_CUSTOM; break; + case MODE_RAW: flags |= MRP_TRANSPORT_MODE_RAW; break; + default: + case MODE_MESSAGE: flags |= MRP_TRANSPORT_MODE_MSG; + } + c->lt = mrp_transport_create(c->ml, c->atype, &evt, c, flags); if (c->lt == NULL) { @@ -479,6 +537,29 @@ void send_custom(context_t *c) } +void send_raw(context_t *c) +{ + uint32_t seq = c->seqno++; + char msg[256]; + size_t size; + int status; + + size = snprintf(msg, sizeof(msg), "this is message #%u", seq); + + if (c->connect) + status = mrp_transport_sendraw(c->t, msg, size); + else + status = mrp_transport_sendrawto(c->t, msg, size, &c->addr, c->alen); + + if (!status) { + mrp_log_error("Failed to send raw message #%d.", seq); + exit(1); + } + else + mrp_log_info("Message #%u succesfully sent.", seq); +} + + void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data) { @@ -487,10 +568,12 @@ void send_cb(mrp_mainloop_t *ml, mrp_timer_t *t, void *user_data) MRP_UNUSED(ml); MRP_UNUSED(t); - if (c->custom) - send_custom(c); - else - send_msg(c); + switch (c->mode) { + case MODE_CUSTOM: send_custom(c); break; + case MODE_RAW: send_raw(c); break; + default: + case MODE_MESSAGE: send_msg(c); + } } @@ -507,17 +590,25 @@ void client_init(context_t *c) type_init(c); - if (c->custom) { + switch (c->mode) { + case MODE_CUSTOM: evt.recvdata = recv_custom; evt.recvdatafrom = recvfrom_custom; - } - else { - evt.recvmsg = recv_msg; - evt.recvmsgfrom = recvfrom_msg; + flags = MRP_TRANSPORT_MODE_CUSTOM; + break; + case MODE_RAW: + evt.recvraw = recv_raw; + evt.recvrawfrom = recvfrom_raw; + flags = MRP_TRANSPORT_MODE_RAW; + break; + default: + case MODE_MESSAGE: + evt.recvmsg = recv_msg; + evt.recvmsgfrom = recvfrom_msg; + flags = MRP_TRANSPORT_MODE_MSG; } - flags = c->custom ? MRP_TRANSPORT_MODE_CUSTOM : 0; - c->t = mrp_transport_create(c->ml, c->atype, &evt, c, flags); + c->t = mrp_transport_create(c->ml, c->atype, &evt, c, flags); if (c->t == NULL) { mrp_log_error("Failed to create new transport."); @@ -576,6 +667,7 @@ static void print_usage(const char *argv0, int exit_code, const char *fmt, ...) " -a, --address address to use\n" " -c, --custom use custom messages\n" " -m, --message use generic messages (default)\n" + " -r, --raw use raw messages\n" " -b, --buggy use buggy data descriptors\n" " -t, --log-target=TARGET log target to use\n" " TARGET is one of stderr,stdout,syslog, or a logfile path\n" @@ -598,7 +690,6 @@ static void config_set_defaults(context_t *ctx) mrp_clear(ctx); ctx->addrstr = "tcp4:127.0.0.1:3000"; ctx->server = FALSE; - ctx->custom = FALSE; ctx->log_mask = MRP_LOG_UPTO(MRP_LOG_DEBUG); ctx->log_target = MRP_LOG_TO_STDERR; } @@ -606,25 +697,26 @@ static void config_set_defaults(context_t *ctx) int parse_cmdline(context_t *ctx, int argc, char **argv) { -# define OPTIONS "scmbCa:l:t:vdh" +# define OPTIONS "scmrbCa:l:t:v:d:h" struct option options[] = { { "server" , no_argument , NULL, 's' }, { "address" , required_argument, NULL, 'a' }, { "custom" , no_argument , NULL, 'c' }, - { "connect" , no_argument , NULL, 'C' }, { "message" , no_argument , NULL, 'm' }, + { "raw" , no_argument , NULL, 'r' }, + { "connect" , no_argument , NULL, 'C' }, + { "buggy" , no_argument , NULL, 'b' }, { "log-level" , required_argument, NULL, 'l' }, { "log-target", required_argument, NULL, 't' }, { "verbose" , optional_argument, NULL, 'v' }, - { "debug" , no_argument , NULL, 'd' }, + { "debug" , required_argument, NULL, 'd' }, { "help" , no_argument , NULL, 'h' }, { NULL, 0, NULL, 0 } }; - int opt, debug; + int opt; - debug = FALSE; config_set_defaults(ctx); while ((opt = getopt_long(argc, argv, OPTIONS, options, NULL)) != -1) { @@ -634,11 +726,30 @@ int parse_cmdline(context_t *ctx, int argc, char **argv) break; case 'c': - ctx->custom = TRUE; + if (ctx->mode == MODE_DEFAULT) + ctx->mode = MODE_CUSTOM; + else { + mrp_log_error("Multiple modes requested."); + exit(1); + } break; case 'm': - ctx->custom = FALSE; + if (ctx->mode == MODE_DEFAULT) + ctx->mode = MODE_MESSAGE; + else { + mrp_log_error("Multiple modes requested."); + exit(1); + } + break; + + case 'r': + if (ctx->mode == MODE_DEFAULT) + ctx->mode = MODE_RAW; + else { + mrp_log_error("Multiple modes requested."); + exit(1); + } break; case 'b': @@ -671,7 +782,9 @@ int parse_cmdline(context_t *ctx, int argc, char **argv) break; case 'd': - debug = TRUE; + ctx->log_mask |= MRP_LOG_MASK_DEBUG; + mrp_debug_set_config(optarg); + mrp_debug_enable(TRUE); break; case 'h': @@ -684,9 +797,6 @@ int parse_cmdline(context_t *ctx, int argc, char **argv) } } - if (debug) - ctx->log_mask |= MRP_LOG_MASK_DEBUG; - return TRUE; } @@ -706,12 +816,15 @@ int main(int argc, char *argv[]) else mrp_log_info("Running as client, using address '%s'...", c.addrstr); - if (c.custom) - mrp_log_info("Using custom messages..."); - else - mrp_log_info("Using generic messages..."); + switch (c.mode) { + case MODE_CUSTOM: mrp_log_info("Using custom messages..."); break; + case MODE_RAW: mrp_log_info("Using raw messages..."); break; + default: + case MODE_MESSAGE: mrp_log_info("Using generic messages..."); + } - if (!strncmp(c.addrstr, "tcp", 3) || !strncmp(c.addrstr, "unxs", 4)) { + if (!strncmp(c.addrstr, "tcp", 3) || !strncmp(c.addrstr, "unxs", 4) || + !strncmp(c.addrstr, "wsck", 4)) { c.stream = TRUE; c.connect = TRUE; } diff --git a/src/common/websocket.c b/src/common/websocket.c new file mode 100644 index 0000000..ed64b68 --- /dev/null +++ b/src/common/websocket.c @@ -0,0 +1,63 @@ +#include +#include + + +void mrp_websock_set_loglevel(mrp_websock_loglevel_t mask) +{ + wsl_set_loglevel(mask); +} + + +mrp_websock_context_t *mrp_websock_create_context(mrp_mainloop_t *ml, + struct sockaddr *sa, + mrp_websock_proto_t *proto, + int nproto, + void *user_data) +{ + return wsl_create_context(ml, sa, proto, nproto, user_data); +} + + +mrp_websock_context_t *mrp_websock_ref_context(mrp_websock_context_t *ctx) +{ + return wsl_ref_context(ctx); +} + + +int mrp_websock_unref_context(mrp_websock_context_t *ctx) +{ + return wsl_unref_context(ctx); +} + + +mrp_websock_t *mrp_websock_connect(mrp_websock_context_t *ctx, + struct sockaddr *sa, const char *protocol, + void *user_data) +{ + return wsl_connect(ctx, sa, protocol, user_data); +} + + +mrp_websock_t *mrp_websock_accept_pending(mrp_websock_context_t *ctx, + void *user_data) +{ + return wsl_accept_pending(ctx, user_data); +} + + +void mrp_websock_reject_pending(mrp_websock_context_t *ctx) +{ + wsl_reject_pending(ctx); +} + + +void *mrp_websock_close(mrp_websock_t *sck) +{ + return wsl_close(sck); +} + + +int mrp_websock_send(mrp_websock_t *sck, void *payload, size_t size) +{ + return wsl_send(sck, payload, size); +} diff --git a/src/common/websocket.h b/src/common/websocket.h new file mode 100644 index 0000000..c247163 --- /dev/null +++ b/src/common/websocket.h @@ -0,0 +1,81 @@ +#ifndef __MURPHY_WEBSOCKET_H__ +#define __MURPHY_WEBSOCKET_H__ + +#include +#include + +MRP_CDECL_BEGIN + +/* + * websocket types (mapped) + */ + +typedef wsl_ctx_t mrp_websock_context_t; +typedef wsl_sck_t mrp_websock_t; +typedef wsl_callbacks_t mrp_websock_evt_t; +typedef wsl_proto_t mrp_websock_proto_t; + +/* + * websocket log levels (mapped) + */ + +typedef enum { +#define MAP(mrp, wsl) MRP_WEBSOCK_LOG_##mrp = WSL_LOG_##wsl + MAP(NONE , NONE), + MAP(ERROR , ERROR), + MAP(WARNING, WARNING), + MAP(INFO , INFO), + MAP(DEBUG , DEBUG), + MAP(ALL , ALL), + MAP(PARSER , PARSER), + MAP(EXT , EXT), + MAP(CLIENT , CLIENT), + MAP(EXTRA , EXTRA), + MAP(VERBOSE, VERBOSE) +#undef MAP +} mrp_websock_loglevel_t; + + + +/* + * websocket function prototypes + */ + +/** Set websocket logging level. */ +void mrp_websock_set_loglevel(mrp_websock_loglevel_t mask); + +/** Create a websocket context. */ +mrp_websock_context_t *mrp_websock_create_context(mrp_mainloop_t *ml, + struct sockaddr *sa, + mrp_websock_proto_t *proto, + int nproto, + void *user_data); + +/** Add a reference to a websocket context. */ +mrp_websock_context_t *mrp_websock_ref_context(mrp_websock_context_t *ctx); + +/** Remove a context reference. */ +int mrp_websock_unref_context(mrp_websock_context_t *ctx); + +/** Create and connect a websocket to a given address. */ +mrp_websock_t *mrp_websock_connect(mrp_websock_context_t *ctx, + struct sockaddr *sa, const char *protocol, + void *user_data); + +/** Accept a pending connection of a context. */ +mrp_websock_t *mrp_websock_accept_pending(mrp_websock_context_t *ctx, + void *user_data); + +/** Reject a pending connection of a context. */ +void mrp_websock_reject_pending(mrp_websock_context_t *ctx); + +/** Close a websocket. Return the user_data of it's associated context. */ +void *mrp_websock_close(mrp_websock_t *sck); + +/** Send data over a connected websocket. */ +int mrp_websock_send(mrp_websock_t *sck, void *payload, size_t size); + +MRP_CDECL_END + + +#endif /* __MURPHY_WEBSOCKET_H__ */ diff --git a/src/common/websocklib.c b/src/common/websocklib.c new file mode 100644 index 0000000..86593a7 --- /dev/null +++ b/src/common/websocklib.c @@ -0,0 +1,1140 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "websocklib.h" + +#define LWS_EVENT_OK 0 /* event handler result: ok */ +#define LWS_EVENT_DENY 1 /* event handler result: deny */ +#define LWS_EVENT_ERROR 1 /* event handler result: error */ + +/* libwebsocket status used to close sockets upon error */ +#define LWS_INTERNAL_ERROR LWS_CLOSE_STATUS_UNEXPECTED_CONDITION + +/* SSL modes */ +#define LWS_NO_SSL 0 /* no SSL at all */ +#define LWS_SSL 1 /* SSL, deny self-signed certs */ +#define LWS_SSL_SELFSIGNED 2 /* SSL, allow self-signed certs */ + +/* + * define shorter aliasen for libwebsocket types + */ + +typedef struct libwebsocket lws_t; +typedef struct libwebsocket_context lws_ctx_t; +typedef struct libwebsocket_extension lws_ext_t; +typedef struct libwebsocket_protocols lws_proto_t; +typedef enum libwebsocket_callback_reasons lws_event_t; + + +/* + * a libwebsocket fd we (e)poll + * + * Unfortunately the mechanism offered by libwebsockets for external + * mainloop integration uses event mask diffs when asking the mainloop + * to modify what an fd is polled for. This forces us to do double + * bookkeeping: we need to to keep track of the current event mask for + * all descriptors just to figure out the new mask when libwebsockets + * hands us a diff. + */ + +typedef struct { + int fd; /* libwebsocket file descriptor */ + uint32_t events; /* monitored (epoll) events */ +} pollfd_t; + + +/* + * a websocket context + */ + +struct wsl_ctx_s { + lws_ctx_t *ctx; /* libwebsocket context */ + wsl_proto_t *protos; /* protocols */ + int nproto; /* number of protocols */ + lws_proto_t *lws_protos; /* libwebsocket protocols */ + mrp_refcnt_t refcnt; /* reference count */ + int epollfd; /* epoll descriptor */ + mrp_io_watch_t *w; /* I/O watch for epollfd */ + mrp_mainloop_t *ml; /* pumping mainloop */ + pollfd_t *fds; /* polled descriptors */ + int nfd; /* number descriptors */ + void *user_data; /* opaque user data */ + lws_t *pending; /* pending connection */ + void *pending_user; /* user_data of pending */ + wsl_proto_t *pending_proto; /* protocol of pending */ +}; + +/* + * a websocket instance + */ + +struct wsl_sck_s { + wsl_ctx_t *ctx; /* associated context */ + lws_t *sck; /* libwebsocket instance */ + wsl_proto_t *proto; /* protocol data */ + mrp_fragbuf_t *buf; /* fragment collection buffer */ + void *user_data; /* opaque user data */ + wsl_sck_t **sckptr; /* back pointer from sck to us */ + int closing : 1; /* close in progress */ + int busy; /* upper-layer callback(s) active */ +}; + + +/* + * mark a socket busy while executing a piece of code + */ + +#define SOCKET_BUSY_REGION(sck, ...) do { \ + (sck)->busy++; \ + __VA_ARGS__; \ + (sck)->busy--; \ + } while (0) + + + +static int http_event(lws_ctx_t *ws_ctx, lws_t *ws, lws_event_t event, + void *user, void *in, size_t len); +static int wsl_event(lws_ctx_t *ws_ctx, lws_t *ws, lws_event_t event, + void *user, void *in, size_t len); +static void destroy_context(wsl_ctx_t *ctx); + + + +static inline uint32_t map_poll_to_event(int in) +{ + uint32_t mask = 0; + + if (in & POLLIN) mask |= MRP_IO_EVENT_IN; + if (in & POLLOUT) mask |= MRP_IO_EVENT_OUT; + if (in & POLLHUP) mask |= MRP_IO_EVENT_HUP; + if (in & POLLERR) mask |= MRP_IO_EVENT_ERR; + + return mask; + +} + + +static inline short map_event_to_poll(uint32_t in) +{ + short mask = 0; + + if (in & MRP_IO_EVENT_IN) mask |= POLLIN; + if (in & MRP_IO_EVENT_OUT) mask |= POLLOUT; + if (in & MRP_IO_EVENT_HUP) mask |= POLLHUP; + if (in & MRP_IO_EVENT_ERR) mask |= POLLERR; + + return mask; +} + + +static int add_fd(wsl_ctx_t *wsc, int fd, int events) +{ + struct epoll_event e; + + if (wsc != NULL) { + e.data.u64 = 0; + e.data.fd = fd; + e.events = map_poll_to_event(events); + + if (epoll_ctl(wsc->epollfd, EPOLL_CTL_ADD, fd, &e) == 0) { + if (mrp_reallocz(wsc->fds, wsc->nfd, wsc->nfd + 1) != NULL) { + wsc->fds[wsc->nfd].fd = fd; + wsc->fds[wsc->nfd].events = e.events; + wsc->nfd++; + + return TRUE; + } + else + epoll_ctl(wsc->epollfd, EPOLL_CTL_DEL, fd, &e); + } + } + + return FALSE; +} + + +static int del_fd(wsl_ctx_t *wsc, int fd) +{ + struct epoll_event e; + int i; + + if (wsc != NULL) { + e.data.u64 = 0; + e.data.fd = fd; + e.events = 0; + epoll_ctl(wsc->epollfd, EPOLL_CTL_DEL, fd, &e); + + for (i = 0; i < wsc->nfd; i++) { + if (wsc->fds[i].fd == fd) { + if (i < wsc->nfd - 1) + memmove(wsc->fds + i, wsc->fds + i + 1, + (wsc->nfd - i - 1) * sizeof(*wsc->fds)); + + mrp_reallocz(wsc->fds, wsc->nfd, wsc->nfd - 1); + wsc->nfd--; + + return TRUE; + } + } + } + + return FALSE; +} + + +static pollfd_t *find_fd(wsl_ctx_t *wsc, int fd) +{ + int i; + + if (wsc != NULL) { + for (i = 0; i < wsc->nfd; i++) + if (wsc->fds[i].fd == fd) + return wsc->fds + i; + } + + return NULL; +} + + +static int mod_fd(wsl_ctx_t *wsc, int fd, int events, int clear) +{ + struct epoll_event e; + pollfd_t *wfd; + + if (wsc != NULL) { + wfd = find_fd(wsc, fd); + + if (wfd != NULL) { + e.data.u64 = 0; + e.data.fd = fd; + + if (clear) + e.events = wfd->events & ~map_poll_to_event(events); + else + e.events = wfd->events | map_poll_to_event(events); + + if (epoll_ctl(wsc->epollfd, EPOLL_CTL_MOD, fd, &e) == 0) + return TRUE; + } + } + + return FALSE; +} + + +static void purge_fds(wsl_ctx_t *wsc) +{ + if (wsc != NULL) { + mrp_free(wsc->fds); + wsc->fds = NULL; + wsc->nfd = 0; + } +} + + +static void epoll_event(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd, + mrp_io_event_t mask, void *user_data) +{ + wsl_ctx_t *wsc = (wsl_ctx_t *)user_data; + pollfd_t *wfd; + struct epoll_event *events, *e; + int nevent, n, i; + struct pollfd pollfd; + + MRP_UNUSED(ml); + MRP_UNUSED(w); + MRP_UNUSED(fd); + + if (wsc->nfd <= 0 || !(mask & MRP_IO_EVENT_IN)) + return; + + nevent = wsc->nfd; + events = alloca(nevent * sizeof(*events)); + + while ((n = epoll_wait(wsc->epollfd, events, nevent, 0)) > 0) { + mrp_debug("got %d epoll events for websocket context %p", n, wsc); + + for (i = 0, e = events; i < n; i++, e++) { + wfd = find_fd(wsc, e->data.fd); + + if (wfd != NULL) { + pollfd.fd = wfd->fd; + pollfd.events = map_event_to_poll(wfd->events); + pollfd.revents = map_event_to_poll(e->events); + + mrp_debug("delivering events 0x%x to websocket fd %d", + pollfd.revents, pollfd.fd); + + libwebsocket_service_fd(wsc->ctx, &pollfd); + } + } + } +} + + +/* + * context handling + */ + +wsl_ctx_t *wsl_create_context(mrp_mainloop_t *ml, struct sockaddr *addr, + wsl_proto_t *protos, int nproto, void *user_data) +{ + lws_ext_t *builtin = libwebsocket_internal_extensions; + wsl_ctx_t *ctx; + wsl_proto_t *up; + lws_proto_t *lws_protos, *lp; + int lws_nproto, has_http; + mrp_io_event_t events; + const char *dev; + int port, i; + + + if (addr == NULL) { + dev = NULL; + port = 0; + } + else { + switch (addr->sa_family) { + case AF_INET: + dev = NULL; + port = (int)ntohs(((struct sockaddr_in *)addr)->sin_port); + break; + + case AF_INET6: + dev = NULL; + port = (int)ntohs(((struct sockaddr_in6 *)addr)->sin6_port); + break; + + default: + errno = EINVAL; + return NULL; + } + } + + ctx = mrp_allocz(sizeof(*ctx)); + + if (ctx == NULL) + goto fail; + + mrp_refcnt_init(&ctx->refcnt); + + ctx->protos = protos; + ctx->nproto = nproto; + + has_http = !strncmp(protos[0].name, "http", 4); + lws_nproto = (has_http ? nproto : nproto + 1) + 1; + lws_protos = mrp_allocz_array(lws_proto_t, lws_nproto); + + if (lws_protos == NULL) + goto fail; + + lws_protos[0].name = "http"; + lws_protos[0].callback = http_event; + if (!has_http) + lws_protos[0].per_session_data_size = sizeof(void *); + else + lws_protos[0].per_session_data_size = 0; + + lp = lws_protos + 1; + up = protos + (has_http ? 1 : 0); + + for (i = 0; i < nproto; i++) { + lp->name = up->name; + lp->callback = wsl_event; + lp->per_session_data_size = sizeof(void *); + + lp++; + up++; + } + + ctx->lws_protos = lws_protos; + + ctx->epollfd = epoll_create1(EPOLL_CLOEXEC); + + if (ctx->epollfd < 0) + goto fail; + + events = MRP_IO_EVENT_IN; + ctx->ml = ml; + ctx->w = mrp_add_io_watch(ml, ctx->epollfd, events, epoll_event, ctx); + + if (ctx->w == NULL) + goto fail; + + ctx->ctx = libwebsocket_create_context(port, dev, lws_protos, builtin, + NULL, NULL, NULL, -1, -1, 0, + ctx); + + if (ctx->ctx != NULL) { + ctx->user_data = user_data; + + return ctx; + } + + fail: + if (ctx != NULL) { + if (ctx->epollfd >= 0) { + mrp_del_io_watch(ctx->w); + close(ctx->epollfd); + } + + mrp_free(ctx); + } + + return NULL; +} + + +wsl_ctx_t *wsl_ref_context(wsl_ctx_t *ctx) +{ + return mrp_ref_obj(ctx, refcnt); +} + + +int wsl_unref_context(wsl_ctx_t *ctx) +{ + if (mrp_unref_obj(ctx, refcnt)) { + destroy_context(ctx); + + return TRUE; + } + else + return FALSE; +} + + +static void destroy_context(wsl_ctx_t *ctx) +{ + if (ctx != NULL) { + mrp_del_io_watch(ctx->w); + ctx->w = NULL; + + close(ctx->epollfd); + ctx->epollfd = -1; + + purge_fds(ctx); + + if (ctx->ctx != NULL) + libwebsocket_context_destroy(ctx->ctx); + + mrp_free(ctx->lws_protos); + mrp_free(ctx); + } +} + + +static wsl_proto_t *find_context_protocol(wsl_ctx_t *ctx, const char *protocol) +{ + wsl_proto_t *up; + int i; + + if (protocol != NULL) { + for (i = 0, up = ctx->protos; i < ctx->nproto; i++, up++) + if (!strcmp(up->name, protocol)) + return up; + } + + return NULL; +} + + +wsl_sck_t *wsl_connect(wsl_ctx_t *ctx, struct sockaddr *sa, + const char *protocol, void *user_data) +{ + wsl_sck_t *sck, **ptr; + wsl_proto_t *up; + int port; + void *aptr; + char abuf[256]; + const char *astr; + + switch (sa->sa_family) { + case AF_INET: + aptr = &((struct sockaddr_in *)sa)->sin_addr; + port = ntohs(((struct sockaddr_in *)sa)->sin_port); + break; + case AF_INET6: + aptr = &((struct sockaddr_in6 *)sa)->sin6_addr; + port = ntohs(((struct sockaddr_in6 *)sa)->sin6_port); + break; + default: + errno = EINVAL; + return NULL; + } + + astr = inet_ntop(sa->sa_family, aptr, abuf, sizeof(abuf)); + + if (astr == NULL) + return NULL; + + up = find_context_protocol(ctx, protocol); + + if (up == NULL) { + errno = ENOPROTOOPT; + return NULL; + } + + sck = mrp_allocz(sizeof(*sck)); + ptr = mrp_allocz(sizeof(*ptr)); + + if (sck != NULL && ptr != NULL) { + /* + * Now we need to create and connect a new libwebsocket instance + * within the given context. We also need to set up a one-to-one + * mapping between the underlying libwebsocket and our wsl_sck_t + * so that we can handle both top-down (sending) and bottom-up + * (receiving) event propagation in the stack. + * + * We use the user data associated with the libwebsocket instance + * to store a back pointer to us. Whenever the socket instance + * is deleted locally (as opposed to our peer closing the session) + * we need to prevent the propagation of any potentially pending + * events to our deleted wsl_sck_t (which might have been freed). + * This we do by clearing the back pointer from the instance to us. + * + * However, since libwebsockets does not provide an API for this, + * as a trick we use an indirect back pointer and store a pointer + * to the actual back pointer also in wsl_sck_t here. This way we + * can always clear the back pointer when we need to. + * + * Also note, that memory management for the associated user data + * is asymmetric in many sense. For client connections, we allocate + * the data buffer and pass it on to libwebsockets. For incoming + * connections the user data buffer is allocated by libwebsockets + * and we only get a chance to fill it in the event handler for + * connection establishment. However, for both incoming and outgoing + * connections libwebsockets will free the buffer on behalf of us. + * + * The exact same notes apply to wsl_accept_pending below... + */ + + sck->ctx = wsl_ref_context(ctx); + sck->proto = up; + sck->buf = mrp_fragbuf_create(TRUE, 0); + + if (sck->buf != NULL) { + sck->user_data = user_data; + *ptr = sck; + sck->sckptr = ptr; + + sck->sck = libwebsocket_client_connect_extended(ctx->ctx, + astr, port, + LWS_NO_SSL, + "/", astr, astr, + protocol, -1, + ptr); + + if (sck->sck != NULL) + return sck; + + mrp_fragbuf_destroy(sck->buf); + } + + wsl_unref_context(ctx); + mrp_free(ptr); + mrp_free(sck); + } + + return NULL; +} + + +wsl_sck_t *wsl_accept_pending(wsl_ctx_t *ctx, void *user_data) +{ + wsl_sck_t *sck, **ptr; + + if (ctx->pending == NULL) + return NULL; + + mrp_debug("accepting pending websocket connection %p/%p", ctx->pending, + ctx->pending_user); + + sck = mrp_allocz(sizeof(*sck)); + + if (sck != NULL) { + /* + * Notes: + * The same notes apply here for context creation as for + * wsl_connect above... + */ + sck->ctx = wsl_ref_context(ctx); + sck->buf = mrp_fragbuf_create(TRUE, 0); + + if (sck->buf != NULL) { + sck->proto = ctx->pending_proto; + sck->user_data = user_data; + sck->sck = ctx->pending; + ptr = (wsl_sck_t **)ctx->pending_user; + *ptr = sck; + sck->sckptr = ptr; + + /* let the event handler know we accepted the client */ + ctx->pending = NULL; + ctx->pending_user = NULL; + ctx->pending_proto = NULL; + + return sck; + } + + wsl_unref_context(ctx); + mrp_free(sck); + } + + return NULL; +} + + +void wsl_reject_pending(wsl_ctx_t *ctx) +{ + mrp_debug("reject pending websocket (%s) connection %p/%p", + ctx->pending_proto->name, ctx->pending, ctx->pending_user); + + /* + * Nothing to do here really... just don't clear ctx->pending so the + * event handler will know to reject once it regains control. + */ +} + + +void *wsl_close(wsl_sck_t *sck) +{ + wsl_ctx_t *ctx; + void *user_data; + int status; + + user_data = NULL; + + if (sck != NULL) { + if (sck->sck != NULL && sck->busy <= 0) { + mrp_debug("closing websocket %p/%p", sck, sck->sck); + + status = LWS_CLOSE_STATUS_NORMAL; + ctx = sck->ctx; + + sck->closing = TRUE; + libwebsocket_close_and_free_session(ctx->ctx, sck->sck, status); + sck->sck = NULL; + *sck->sckptr = NULL; + + if (ctx != NULL) { + user_data = ctx->user_data; + wsl_unref_context(ctx); + sck->ctx = NULL; + } + + mrp_fragbuf_destroy(sck->buf); + sck->buf = NULL; + + mrp_debug("freeing websocket %p", sck); + mrp_free(sck); + } + else { + mrp_debug("marking websocket %p/%p for closing", sck, sck->sck); + sck->closing = TRUE; + } + } + + return user_data; +} + + +static int check_closed(wsl_sck_t *sck) +{ + if (sck->closing && sck->busy <= 0) { + wsl_close(sck); + return TRUE; + } + else + return FALSE; +} + + +int wsl_send(wsl_sck_t *sck, void *payload, size_t size) +{ + unsigned char *buf; + size_t pre, post; + uint32_t *len; + + if (sck != NULL && sck->sck != NULL) { + pre = LWS_SEND_BUFFER_PRE_PADDING; + post = LWS_SEND_BUFFER_POST_PADDING; + buf = alloca(pre + sizeof(*len) + size + post); + + len = (uint32_t *)(buf + pre); + *len = htobe32(size); + memcpy(buf + pre + sizeof(*len), payload, size); + + if (libwebsocket_write(sck->sck, buf + pre, sizeof(*len) + size, + LWS_WRITE_BINARY) >= 0) + return TRUE; + } + + return FALSE; +} + + +static int http_event(lws_ctx_t *ws_ctx, lws_t *ws, lws_event_t event, + void *user, void *in, size_t len) +{ + wsl_ctx_t *ctx = libwebsocket_context_user(ws_ctx); + const char *ext, *uri; + int fd, mask; + + switch (event) { + case LWS_CALLBACK_ESTABLISHED: + mrp_debug("client-handshake completed on websocket %p/%p", ws, user); + return LWS_EVENT_OK; + + case LWS_CALLBACK_CLOSED: + mrp_debug("websocket %p/%p closed", ws, user); + return LWS_EVENT_OK; + + case LWS_CALLBACK_CLIENT_ESTABLISHED: + mrp_debug("server-handshake completed on websocket %p/%p", ws, user); + return LWS_EVENT_OK; + + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + mrp_debug("client connection failed"); + return LWS_EVENT_OK; + + case LWS_CALLBACK_RECEIVE: + mrp_debug("received HTTP data from client"); + return LWS_EVENT_OK; + + case LWS_CALLBACK_CLIENT_RECEIVE: + mrp_debug("recived HTTP data from server"); + return LWS_EVENT_OK; + + case LWS_CALLBACK_BROADCAST: + mrp_debug("denying broadcast"); + return LWS_EVENT_DENY; + + case LWS_CALLBACK_CLIENT_RECEIVE_PONG: + mrp_debug("client received pong"); + return LWS_EVENT_OK; + + /* + * mainloop integration + */ + case LWS_CALLBACK_ADD_POLL_FD: + fd = (ptrdiff_t)user; + mask = (int)len; + mrp_debug("start polling fd %d for events 0x%x", fd, mask); + if (add_fd(ctx, fd, mask)) + return LWS_EVENT_OK; + else + return LWS_EVENT_ERROR; + + case LWS_CALLBACK_DEL_POLL_FD: + fd = (ptrdiff_t)user; + mrp_debug("stop polling fd %d", fd); + if (del_fd(ctx, fd)) + return LWS_EVENT_OK; + else + return LWS_EVENT_ERROR; + + case LWS_CALLBACK_SET_MODE_POLL_FD: + fd = (ptrdiff_t)user; + mask = (int)len; + mrp_debug("enable poll events 0x%x for fd %d", mask, fd); + if (mod_fd(ctx, fd, mask, FALSE)) + return LWS_EVENT_OK; + else + return LWS_EVENT_ERROR; + + case LWS_CALLBACK_CLEAR_MODE_POLL_FD: + fd = (ptrdiff_t)user; + mask = (int)len; + mrp_debug("disable poll events 0x%x for fd %d", mask, fd); + if (mod_fd(ctx, fd, mask, TRUE)) + return LWS_EVENT_OK; + else + return LWS_EVENT_ERROR; + + case LWS_CALLBACK_SERVER_WRITEABLE: + mrp_debug("socket server side writeable again"); + return LWS_EVENT_OK; + + case LWS_CALLBACK_CLIENT_WRITEABLE: + mrp_debug("socket client side writeable again"); + return LWS_EVENT_OK; + + /* + * clients wanting to stay pure HTTP clients + */ + case LWS_CALLBACK_HTTP: + uri = (const char *)in; + /* we don't serve HTTP requests */ + mrp_debug("denying HTTP request for '%s'", uri); + return LWS_EVENT_DENY; + + case LWS_CALLBACK_HTTP_FILE_COMPLETION: + uri = (const char *)in; + mrp_debug("serving '%s' over HTTP completed", uri); + return LWS_EVENT_OK; + + /* + * events always routed to protocols[0] + */ + + case LWS_CALLBACK_FILTER_NETWORK_CONNECTION: + fd = (ptrdiff_t)user; + /* we don't filter based on the socket/address */ + return LWS_EVENT_OK; + + case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: + /* we don't filter based on headers */ + return LWS_EVENT_OK; + + case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: + case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS: + case LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION: + /* we don't support or do anything for SSL at the moment */ + return LWS_EVENT_OK; + + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + /* no extra headers we'd like to add */ + return LWS_EVENT_OK; + + case LWS_CALLBACK_CONFIRM_EXTENSION_OKAY: + ext = (const char *)in; + /* deny all extensions on the server side */ + mrp_debug("denying server extension '%s'", ext); + return LWS_EVENT_DENY; + + case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED: + ext = (const char *)in; + /* deny all extensions on the client side */ + mrp_debug("denying client extension '%s'", ext); + return LWS_EVENT_DENY; + + } + + return LWS_EVENT_DENY; +} + + +static int wsl_event(lws_ctx_t *ws_ctx, lws_t *ws, lws_event_t event, + void *user, void *in, size_t len) +{ + wsl_ctx_t *ctx = libwebsocket_context_user(ws_ctx); + wsl_sck_t *sck; + wsl_proto_t *up; + void *data; + size_t size; + const char *ext; + lws_proto_t *proto; + int status; + + MRP_UNUSED(ext); + MRP_UNUSED(ws_ctx); + + switch (event) { + case LWS_CALLBACK_ESTABLISHED: + mrp_debug("client-handshake completed on websocket %p/%p", ws, user); + + /* + * Connection acceptance is a bit tricky. Once libwebsockets + * has completed its handshaking phase with the client it lets + * us know about a new established connection. This is what we + * want to map to an incoming connection attempt. Since we don't + * want to know about the internals of the upper layer, neither + * want the upper layer to know about our internals, the only + * way to pass information about the connection around in the + * context at this point. To keep things simple we only prepare + * and handle once outstanding connection attemp at a time. This + * is equivalent to listening on a stream-socket with backlog 1. + * Since we run single-threaded it shouldn't ever be possible to + * have more than one pending connection if the upper layer does + * things right but we do check for this just in case... + */ + + if (ctx->pending != NULL) { + mrp_log_error("Multiple pending connections, rejecting."); + return LWS_EVENT_DENY; + } + + /* + * Store the pending websocket instance and it's associated + * user data in the context then call the connection notifier + * callback. If the upper layer wants to accept the connection + * it calls wsl_accept_pending. That in turn digs these out + * from the context to set up and hook together things properly. + * If everything goes fine wsl_accept_pending clears pending and + * pending_user from the context. If not or if the upper layer + * decides not to accept the connection, pending and pending_user + * stay intact in which case we'll reject the client below. + */ + + proto = (lws_proto_t *)libwebsockets_get_protocol(ws); + up = find_context_protocol(ctx, proto->name); + + if (up == NULL) { + mrp_debug("unknown protocol '%s' requested, rejecting", + proto ? proto->name : ""); + return LWS_EVENT_DENY; + } + else + mrp_debug("found descriptor %p for protocol '%s'", up, up->name); + + ctx->pending = ws; + ctx->pending_user = user; + ctx->pending_proto = up; + + wsl_ref_context(ctx); + up->cbs.connection(ctx, "XXX TODO peer address", up->name, + ctx->user_data, up->proto_data); + + /* XXX TODO + * check if sockets gets properly closed and freed if + * cb->connection calls close on the 'listening' websocket in + * the transport layer... + */ + + if (ctx->pending == NULL) /* connection accepted */ + status = LWS_EVENT_OK; + else /* connection rejected */ + status = LWS_EVENT_DENY; + wsl_unref_context(ctx); + + return status; + + case LWS_CALLBACK_CLOSED: + proto = (lws_proto_t *)libwebsockets_get_protocol(ws); + up = find_context_protocol(ctx, proto->name); + mrp_debug("websocket %p/%p (%s) closed", ws, user, + up ? up->name : ""); + + sck = *(wsl_sck_t **)user; + up = sck ? sck->proto : NULL; + + if (up != NULL) { + SOCKET_BUSY_REGION(sck, { + up->cbs.closed(sck, 0, sck->user_data, up->proto_data); + up->cbs.check(sck, sck->user_data, up->proto_data); + }); + + check_closed(sck); + } + return LWS_EVENT_OK; + + case LWS_CALLBACK_CLIENT_ESTABLISHED: + mrp_debug("server-handshake completed on websocket %p/%p", ws, user); + return LWS_EVENT_OK; + + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + mrp_debug("client connection failed"); + return LWS_EVENT_OK; + + case LWS_CALLBACK_RECEIVE: + case LWS_CALLBACK_CLIENT_RECEIVE: + mrp_debug("%d bytes received on websocket %p/%p", len, ws, user); + + sck = *(wsl_sck_t **)user; + up = sck ? sck->proto : NULL; + + if (up != NULL) { + if (mrp_fragbuf_push(sck->buf, in, len)) { + data = NULL; + size = 0; + + while (mrp_fragbuf_pull(sck->buf, &data, &size)) { + mrp_debug("websocket %p/%p has a message of %zd bytes", + ws, user, size); + + SOCKET_BUSY_REGION(sck, { + up->cbs.recv(sck, data, size, sck->user_data, + up->proto_data); + up->cbs.check(sck, sck->user_data, up->proto_data); + }); + + if (check_closed(sck)) + break; + } + } + else { + mrp_log_error("failed to push data to fragment buffer"); + + SOCKET_BUSY_REGION(sck, { + sck->closing = TRUE; /* make sure sck gets closed */ + up->cbs.closed(sck, ENOBUFS, sck->user_data, + up->proto_data); + libwebsocket_close_and_free_session(ctx->ctx, sck->sck, + LWS_INTERNAL_ERROR); + up->cbs.check(sck, sck->user_data, up->proto_data); + }); + + check_closed(sck); + } + } + return LWS_EVENT_OK; + + case LWS_CALLBACK_BROADCAST: + mrp_debug("denying broadcast"); + return LWS_EVENT_DENY; + + case LWS_CALLBACK_SERVER_WRITEABLE: + mrp_debug("socket server side writeable again"); + return LWS_EVENT_OK; + + case LWS_CALLBACK_CLIENT_WRITEABLE: + mrp_debug("socket client side writeable again"); + return LWS_EVENT_OK; + + default: + break; + } + + return LWS_EVENT_OK; +} + + + +/* + * logging + */ + +static void libwebsockets(const char *line) +{ + const char *ts, *ll; + const char *b, *e, *lvl; + int l, ls; + + /* + * Notes: + * libwebsockets logging infrastructure has independently maskable + * log classes and supports overriding its default logger. The log + * classes are the regular error, warning, info, and debug classes + * plus the libwebsockets-specific parser, header, extension, and + * client classes. The logging infra filters the messages based on + * their class, then formats the message and passes it on to the + * (default builtin, or externally set) logger function. This gets + * a fully formatted log message that consists of a timestamp, a + * log class prefix and the message itself which typically contains + * at least one terminating newline. + * + * Because of the semantic content of the messages coming from + * libwebsockets we'd like to preserve the class of errors and + * warnings but convert the rest to debug messages. Additionally, + * we'd like to keep the message format as consistent with the + * murphy infra as possible with a reasonable effort. This means + * stripping the timestamp and log class, as these are provided + * by the murphy infra (if configured so). However, for the + * libwebsockets-specific parser-, header-, extension-, and client- + * classes we want to keep the extra information carried by the + * log class as part of the message. + * + * Becuase the libwebsockets log messages are terminated by '\n', + * we also prepare here to properly bridge multiline messages to + * the murphy infra (although I'm not sure the library ever issues + * such messages). + * + * So to sum it up the necessary steps to bridge messages here are: + * 1) strip timestamp, + * 2) dig out and strip log class + * 3) map log class to murphy infra, ie. + * keep errors and warnings, squash the rest to debug + * 4) break multiline messages to lines + * 5) pass each line on to the murphy infra, + * for parser-, header-, extension-, and client-messages + * prefix each line with the class + * + */ + + ts = strchr(line, '['); + ll = strchr(ts, ']'); + + /* strip timestamp, dig out log level, find beginning of the message */ + if (ll != NULL && ll[1] == ' ') { + ll += 2; + b = strchr(ll, ':'); + + if (b != NULL && b[1] == ' ') { + b += 2; + + while (*b == ' ') + b++; + + /* map log level: debug, info, err, warn, or other */ + switch (*ll) { + case 'D': lvl = "d"; break; + case 'I': lvl = "i"; break; + case 'W': lvl = "w"; break; + case 'E': + if (ll[1] == 'R') + lvl = "e"; + else { + other: + lvl = ll; + e = strchr(lvl, ':'); + + if (e != NULL) + ls = e - lvl; + else { + lvl = "???:"; + ls = 4; + } + } + break; + + default: + goto other; + } + } + else + goto unknown; + } + else { + unknown: + /* if we get confused with the format, default to logging it all */ + lvl = NULL; + b = line; + } + + /* break the message to lines and pass it on to the murphy infra */ + e = strchr(b, '\n'); + while (e || b) { + if (e) + l = e - b; + else + l = strlen(b); + + if (!l) + break; + + if (lvl != NULL) { + switch (*lvl) { + case 'd': mrp_debug("%*.*s", l, l, b); break; + case 'i': mrp_debug("%*.*s", l, l, b); break; + case 'w': mrp_log_warning("libwebsockets: %*.*s", l, l, b); break; + case 'e': mrp_log_error("libwebsockets: %*.*s", l, l, b); break; + default: mrp_debug("[%*.*s] %*.*s", ls, ls, lvl, l, l, b); + } + } + else + mrp_debug("%*.*s", l, l, b); + + if (e != NULL) { + b = e + 1; + e = strchr(b, '\n'); + } + else + b = NULL; + } +} + + +void wsl_set_loglevel(wsl_loglevel_t mask) +{ + lws_set_log_level(mask, libwebsockets); +} diff --git a/src/common/websocklib.h b/src/common/websocklib.h new file mode 100644 index 0000000..c8d4652 --- /dev/null +++ b/src/common/websocklib.h @@ -0,0 +1,124 @@ +#ifndef __MURPHY_WEBSOCKLIB_H__ +#define __MURPHY_WEBSOCKLIB_H__ + +#include + +#include + +#include +#include + +MRP_CDECL_BEGIN + +/* + * websocket context + * + * A websocket context is basically a libwebsocket_context plus the + * additional glue data and code necessary to integrate the context + * into our mainloop. For our transport abstraction, we create one + * context per transport instance. However, accepted transports do + * share their context with the listening transport (ie. the server- + * side libwebsocket) they were accepted on. + * + * XXX TODO We probably need to change this so that we create one + * context per address/port (or in libwebsockets case device/port). + * + */ + +typedef struct wsl_ctx_s wsl_ctx_t; + + +/* + * websocket + * + * A websocket is a libwebsocket instance together with its + * associated websocket context. + */ +typedef struct wsl_sck_s wsl_sck_t; + + +/* + * websocket event callbacks to the upper transport layer + * + * These callbacks are used to deliver events from the underlying + * websocket transport layer to the upper murphy transport layer. + */ +typedef struct { + /** Connection attempt on a websocket. */ + void (*connection)(wsl_ctx_t *ctx, char *addr, const char *protocol, + void *user_data, void *proto_data); + /** Websocket connection closed by peer. */ + void (*closed)(wsl_sck_t *sck, int error, void *user_data, + void *proto_data); + /** Data received on websocket. */ + void (*recv)(wsl_sck_t *sck, void *data, size_t size, void *user_data, + void *proto_data); + /** Check if transport should be destroyed. */ + int (*check)(wsl_sck_t *sck, void *user_data, void *proto_data); +} wsl_callbacks_t; + + +/* + * websocket protocol + * + * A websocket protocol is a protocol name together with protocol-specific + * upper-layer callbacks. + */ +typedef struct { + const char *name; /* protocol name */ + wsl_callbacks_t cbs; /* event/request callbacks */ + void *proto_data; /* protocol-specific user data */ +} wsl_proto_t; + + +/* + * logging levels + */ + +typedef enum { + WSL_LOG_NONE = 0x0, + WSL_LOG_ERROR = LLL_ERR, + WSL_LOG_WARNING = LLL_WARN, + WSL_LOG_INFO = LLL_INFO, + WSL_LOG_DEBUG = LLL_DEBUG, + WSL_LOG_ALL = LLL_ERR | LLL_WARN | LLL_INFO | LLL_DEBUG, + WSL_LOG_PARSER = LLL_PARSER, + WSL_LOG_HEADER = LLL_HEADER, + WSL_LOG_EXT = LLL_EXT, + WSL_LOG_CLIENT = LLL_CLIENT, + WSL_LOG_EXTRA = LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT, + WSL_LOG_VERBOSE = WSL_LOG_ALL | WSL_LOG_EXTRA +} wsl_loglevel_t; + +/** Set libwebsock logging level _and_ redirect to murphy logging infra. */ +void wsl_set_loglevel(wsl_loglevel_t mask); + +/** Create a websocket context. */ +wsl_ctx_t *wsl_create_context(mrp_mainloop_t *ml, struct sockaddr *sa, + wsl_proto_t *protos, int nproto, void *user_data); + +/** Add a reference to a context. */ +wsl_ctx_t *wsl_ref_context(wsl_ctx_t *ctx); + +/** Remove a context reference, destroying it once the last is gone. */ +int wsl_unref_context(wsl_ctx_t *ctx); + +/** Create a new websocket connection using a given protocol. */ +wsl_sck_t *wsl_connect(wsl_ctx_t *ctx, struct sockaddr *sa, + const char *protocol, void *user_data); + +/** Accept a pending connection. */ +wsl_sck_t *wsl_accept_pending(wsl_ctx_t *ctx, void *user_data); + +/** Reject a pending connection. */ +void wsl_reject_pending(wsl_ctx_t *ctx); + +/** Close a websocket connection. Return user_data of the associated context. */ +void *wsl_close(wsl_sck_t *sck); + +/** Send data over a wbesocket. */ +int wsl_send(wsl_sck_t *sck, void *payload, size_t size); + +MRP_CDECL_END + +#endif /* __MURPHY_WEBSOCKLIB_H__ */ diff --git a/src/common/wsck-transport.c b/src/common/wsck-transport.c new file mode 100644 index 0000000..21fb706 --- /dev/null +++ b/src/common/wsck-transport.c @@ -0,0 +1,630 @@ +/* + * Copyright (c) 2012, Intel Corporation + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Intel Corporation nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include "websocklib.h" +#include "wsck-transport.h" + +#define WSCKP "wsck" /* websocket transport prefix */ +#define WSCKL 4 /* websocket transport prefix length */ + + +/* + * a websocket transport instance + */ + +typedef struct { + MRP_TRANSPORT_PUBLIC_FIELDS; /* common transport fields */ + wsl_ctx_t *ctx; /* websocket context */ + wsl_sck_t *sck; /* websocket instance */ +} wsck_t; + + +static int resolve_address(const char *str, mrp_wsckaddr_t *wa, socklen_t alen); + +static void connection_cb(wsl_ctx_t *ctx, char *addr, const char *protocol, + void *user_data, void *proto_data); +static void closed_cb(wsl_sck_t *sck, int error, void *user_data, + void *proto_data); +static void recv_cb(wsl_sck_t *sck, void *data, size_t size, void *user_data, + void *proto_data); +static int check_cb(wsl_sck_t *sck, void *user_data, void *proto_data); + + +static socklen_t wsck_resolve(const char *str, mrp_sockaddr_t *addr, + socklen_t size, const char **typep) +{ + mrp_wsckaddr_t *wa = (mrp_wsckaddr_t *)addr; + socklen_t len; + + len = resolve_address(str, wa, size); + + if (len <= 0) + return 0; + else { + if (typep != NULL) + *typep = WSCKP; + + return len; + } +} + + +static int wsck_open(mrp_transport_t *mt) +{ + MRP_UNUSED(mt); + + wsl_set_loglevel(WSL_LOG_ALL); + + return TRUE; +} + + +static int wsck_createfrom(mrp_transport_t *mt, void *conn) +{ + MRP_UNUSED(mt); + MRP_UNUSED(conn); + + return FALSE; +} + + +static void wsck_close(mrp_transport_t *mt) +{ + wsck_t *t = (wsck_t *)mt; + wsl_ctx_t *ctx = t->ctx; + wsl_sck_t *sck = t->sck; + void *user_data; + + t->sck = NULL; + t->ctx = NULL; + + user_data = wsl_close(sck); + + if (user_data == t) /* was our associated context */ + wsl_unref_context(ctx); +} + + +static int wsck_bind(mrp_transport_t *mt, mrp_sockaddr_t *addr, + socklen_t addrlen) +{ + static wsl_proto_t proto = { + "murphy", + { .connection = connection_cb, + .closed = closed_cb, + .recv = recv_cb, + .check = check_cb, }, + NULL + }; + + wsck_t *t = (wsck_t *)mt; + mrp_wsckaddr_t *wa; + struct sockaddr *sa; + + if (addr->any.sa_family != MRP_AF_WSCK || addrlen != sizeof(*wa)) + return FALSE; + + if (t->ctx != NULL) + return FALSE; + + /* + * Unfortunately instead of binding to a address/port pair, the + * underlying libwebsockets library API allows one to bind to a + * device/port pair, with NULL being a wildcard device. + * + * XXX TODO: + * For the time being, we ignore the given address and always bind + * to all interfaces. Later we can try to be a bit cleverer and eg. + * add glue code that digs out the device name based on the address + * (whenever this is unique). + */ + + wa = (mrp_wsckaddr_t *)addr; + + switch (wa->wsck_addr.family) { + case AF_INET: sa = (struct sockaddr *)&wa->wsck_addr.v4; break; + case AF_INET6: sa = (struct sockaddr *)&wa->wsck_addr.v6; break; + default: + errno = EAFNOSUPPORT; + return FALSE; + } + + t->ctx = wsl_create_context(t->ml, sa, &proto, 1, t); + + if (t->ctx != NULL) + return TRUE; + else + return FALSE; +} + + +static int wsck_listen(mrp_transport_t *mt, int backlog) +{ + MRP_UNUSED(mt); + MRP_UNUSED(backlog); + + mt->listened = TRUE; + + return TRUE; +} + + +static int wsck_accept(mrp_transport_t *mt, mrp_transport_t *mlt) +{ + wsck_t *lt = (wsck_t *)mlt; + wsck_t *t = (wsck_t *)mt; + + t->sck = wsl_accept_pending(lt->ctx, t); + + if (t->sck != NULL) { + mrp_debug("accepted websocket connection %p", mlt); + + return TRUE; + } + else { + mrp_debug("failed to accept websocket connection on %p", mlt); + + return FALSE; + } +} + + +static int wsck_connect(mrp_transport_t *mt, mrp_sockaddr_t *addr, + socklen_t addrlen) +{ + static wsl_proto_t proto = { + "murphy", + { .connection = connection_cb, + .closed = closed_cb, + .recv = recv_cb, + .check = check_cb, }, + NULL + }; + + wsck_t *t = (wsck_t *)mt; + mrp_wsckaddr_t *wa; + struct sockaddr *sa; + + if (addr->any.sa_family != MRP_AF_WSCK || addrlen != sizeof(*wa)) + return FALSE; + + if (t->ctx != NULL) + return FALSE; + + wa = (mrp_wsckaddr_t *)addr; + + switch (wa->wsck_addr.family) { + case AF_INET: sa = (struct sockaddr *)&wa->wsck_addr.v4; break; + case AF_INET6: sa = (struct sockaddr *)&wa->wsck_addr.v6; break; + default: + errno = EAFNOSUPPORT; + return FALSE; + } + + t->ctx = wsl_create_context(t->ml, NULL, &proto, 1, t); + + if (t->ctx == NULL) + return FALSE; + + t->sck = wsl_connect(t->ctx, sa, "murphy", t); + + if (t->sck != NULL) { + t->connected = TRUE; + + return TRUE; + } + else { + wsl_unref_context(t->ctx); + t->ctx = NULL; + } + + return FALSE; +} + + +static int wsck_disconnect(mrp_transport_t *mt) +{ + wsck_t *t = (wsck_t *)mt; + wsl_ctx_t *ctx = t->ctx; + wsl_sck_t *sck = t->sck; + void *user_data; + + t->sck = NULL; + t->ctx = NULL; + + user_data = wsl_close(sck); + + if (user_data == t) /* was our associated context */ + wsl_unref_context(ctx); + + return TRUE; +} + + +static int wsck_send(mrp_transport_t *mt, mrp_msg_t *msg) +{ + wsck_t *t = (wsck_t *)mt; + void *buf; + ssize_t size; + int success; + + size = mrp_msg_default_encode(msg, &buf); + + if (wsl_send(t->sck, buf, size)) + success = TRUE; + else + success = FALSE; + + mrp_free(buf); + + return success; +} + + +static int wsck_sendraw(mrp_transport_t *mt, void *data, size_t size) +{ + wsck_t *t = (wsck_t *)mt; + + return wsl_send(t->sck, data, size); +} + + +static int wsck_senddata(mrp_transport_t *mt, void *data, uint16_t tag) +{ + wsck_t *t = (wsck_t *)mt; + mrp_data_descr_t *type; + void *buf; + size_t size, reserve; + uint16_t *tagp; + int status; + + type = mrp_msg_find_type(tag); + + if (type != NULL) { + reserve = sizeof(*tagp); + size = mrp_data_encode(&buf, data, type, reserve); + + if (size > 0) { + tagp = buf; + *tagp = htobe16(tag); + + status = wsl_send(t->sck, buf, size); + + mrp_free(buf); + return status; + } + } + + return FALSE; +} + + +static inline int looks_ipv4(const char *p) +{ +#define DEC_DIGIT(c) ('0' <= (c) && (c) <= '9') + if (DEC_DIGIT(p[0])) { + if (p[1] == '.') + return TRUE; + + if (DEC_DIGIT(p[1])) { + if (p[2] == '.') + return TRUE; + + if (DEC_DIGIT(p[2])) { + if (p[3] == '.') + return TRUE; + } + } + } + + return FALSE; +#undef DEC_DIGIT +} + + +static int resolve_address(const char *str, mrp_wsckaddr_t *wa, socklen_t alen) +{ + struct addrinfo *ai, hints; + const char *node, *port, *proto; + char nbuf[256], pbuf[32]; + int family, status; + size_t len; + + if (strncmp(str, WSCKP":", WSCKL + 1) != 0) + return 0; + else + str += WSCKL + 1; + + node = (char *)str; + + if (node[0] == '[') { + node++; + family = AF_INET6; + port = strchr(node, ']'); + } + else if (looks_ipv4(node)) { + family = AF_INET; + port = strchr(node, ':'); + } + else { + family = AF_UNSPEC; + port = strrchr(node, ':'); + } + + if (port == NULL || (*port != ':' && *port != ']')) { + errno = EINVAL; + return -1; + } + + len = port - node; + + if (len > sizeof(nbuf) - 1) { + errno = EOVERFLOW; + return -1; + } + + strncpy(nbuf, node, len); + nbuf[len] = '\0'; + + if (*port == ']') + port++; + + if (*port != ':') { + errno = EINVAL; + return -1; + } + + port++; + proto = strchr(port, '/'); + + if (proto != NULL) { + len = proto - port; + + if (len > sizeof(pbuf) - 1) { + errno = EOVERFLOW; + return -1; + } + + strncpy(pbuf, port, len); + pbuf[len] = '\0'; + + proto++; + if (strlen(proto) > sizeof(wa->wsck_proto) - 1) { + errno = EOVERFLOW; + return -1; + } + } + else { + proto = MRP_WSCK_DEFPROTO; + len = strlen(port); + + if (len > sizeof(pbuf) - 1) { + errno = EOVERFLOW; + return -1; + } + + strcpy(pbuf, port); + } + + mrp_clear(&hints); + hints.ai_family = family; + + status = getaddrinfo(nbuf, pbuf, &hints, &ai); + + switch (status) { + case 0: + if (ai->ai_addrlen <= alen) { + wa->wsck_family = MRP_AF_WSCK; + memcpy(&wa->wsck_addr, ai->ai_addr, ai->ai_addrlen); + strcpy(wa->wsck_proto, proto); + + len = sizeof(*wa); + } + else { + errno = EOVERFLOW; + len = -1; + } + + freeaddrinfo(ai); + return len; + +#define MAP_ERROR(ai_err, err) \ + case EAI_##ai_err: \ + errno = err; \ + return -1 + + MAP_ERROR(AGAIN , EAGAIN); + MAP_ERROR(BADFLAGS , EADDRNOTAVAIL); + MAP_ERROR(FAIL , EHOSTUNREACH); + MAP_ERROR(FAMILY , EPFNOSUPPORT); + MAP_ERROR(MEMORY , ENOMEM); + MAP_ERROR(NONAME , EHOSTUNREACH); + MAP_ERROR(SERVICE , EAFNOSUPPORT); + MAP_ERROR(SOCKTYPE , EHOSTUNREACH); + MAP_ERROR(SYSTEM , EHOSTUNREACH); +#ifdef EAI_ADDRFAMILY + MAP_ERROR(ADDRFAMILY, EHOSTUNREACH); +#endif +#ifdef EAI_NODATA + MAP_ERROR(NODATA , EHOSTUNREACH); +#endif + + default: + errno = EHOSTUNREACH; + } + + return -1; +} + + +static int print_address(char *buf, size_t size, mrp_wsckaddr_t *wa) +{ + struct sockaddr *saddr; + socklen_t salen; + char nbuf[256], pbuf[32], *b, *e; + int status; + + if (wa->wsck_family != MRP_AF_WSCK) { + invalid: + errno = EINVAL; + return -1; + } + + switch (wa->wsck_addr.family) { + case AF_INET: + saddr = (struct sockaddr *)&wa->wsck_addr.v4; + salen = sizeof(wa->wsck_addr.v4); + b = ""; + e = ""; + break; + case AF_INET6: + saddr = (struct sockaddr *)&wa->wsck_addr.v6; + salen = sizeof(wa->wsck_addr.v6); + b = "["; + e = "]"; + break; + default: + goto invalid; + } + + status = getnameinfo(saddr, salen, nbuf, sizeof(nbuf), pbuf, sizeof(pbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + + if (status == 0) + return snprintf(buf, size, "wsck:%s%s%s:%s/%s", + b, nbuf, e, pbuf, wa->wsck_proto); + else { + printf("error: %d: %s\n", status, gai_strerror(status)); + + errno = EINVAL; + return -1; + } +} + + +static void connection_cb(wsl_ctx_t *ctx, char *addr, const char *protocol, + void *user_data, void *proto_data) +{ + wsck_t *t = (wsck_t *)user_data; + + MRP_UNUSED(addr); + MRP_UNUSED(proto_data); + + mrp_debug("incoming connection (%s) for context %p", protocol, ctx); + + if (t->listened) { + MRP_TRANSPORT_BUSY(t, { + t->evt.connection((mrp_transport_t *)t, t->user_data); + }); + } + else + mrp_log_error("connection attempt on non-listened transport %p", t); +} + + +static void closed_cb(wsl_sck_t *sck, int error, void *user_data, + void *proto_data) +{ + wsck_t *t = (wsck_t *)user_data; + + MRP_UNUSED(proto_data); + + mrp_debug("websocket %p closed", sck); + + if (t->evt.closed != NULL) + MRP_TRANSPORT_BUSY(t, { + t->evt.closed((mrp_transport_t *)t, error, t->user_data); + }); +} + + +static void recv_cb(wsl_sck_t *sck, void *data, size_t size, void *user_data, + void *proto_data) +{ + wsck_t *t = (wsck_t *)user_data; + + MRP_UNUSED(data); + MRP_UNUSED(user_data); + MRP_UNUSED(proto_data); + + mrp_debug("%d bytes on websocket %p", size, sck); + + MRP_TRANSPORT_BUSY(t, { + t->recv_data((mrp_transport_t *)t, data, size, NULL, 0); + }); +} + + +static int check_cb(wsl_sck_t *sck, void *user_data, void *proto_data) +{ + wsck_t *t = (wsck_t *)user_data; + + MRP_UNUSED(proto_data); + + mrp_debug("checking if transport %p (%p) has been destroyed", t, sck); + + if (t != NULL) { + if (t->check_destroy((mrp_transport_t *)t)) { + mrp_debug("transport has been destroyed"); + return TRUE; + } + else + mrp_debug("transport has not been destroyed"); + } + + return FALSE; +} + + +MRP_REGISTER_TRANSPORT(wsck, WSCKP, wsck_t, wsck_resolve, + wsck_open, wsck_createfrom, wsck_close, + wsck_bind, wsck_listen, wsck_accept, + wsck_connect, wsck_disconnect, + wsck_send, NULL, + wsck_sendraw, NULL, + wsck_senddata, NULL); diff --git a/src/common/wsck-transport.h b/src/common/wsck-transport.h new file mode 100644 index 0000000..b5890b7 --- /dev/null +++ b/src/common/wsck-transport.h @@ -0,0 +1,38 @@ +#ifndef __MURPHY_WEBSOCKET_TRANSPORT_H__ +#define __MURPHY_WEBSOCKET_TRANSPORT_H__ + +#include +#include + +MRP_CDECL_BEGIN + +#define MRP_AF_WSCK 0xDC /* stolen address family */ + +#define MRP_WSCKADDR_BASE \ + __SOCKADDR_COMMON(wsck_); /* wsck_family: MRP_AF_WSCK */ \ + union { /* websocket address */ \ + sa_family_t family; \ + struct sockaddr_in v4; \ + struct sockaddr_in6 v6; \ + } wsck_addr \ + +typedef struct { + MRP_WSCKADDR_BASE; +} _mrp_wsckaddr_base_t; + +#define MRP_WSCK_DEFPROTO "murphy" +#define MRP_WSCK_PROTOLEN (MRP_SOCKADDR_SIZE - sizeof(_mrp_wsckaddr_base_t)) + + +/* + * websocket transport address + */ + +typedef struct { + MRP_WSCKADDR_BASE; /* websocket address */ + char wsck_proto[MRP_WSCK_PROTOLEN]; /* websocket protocol */ +} mrp_wsckaddr_t; + +MRP_CDECL_END + +#endif /* __MURPHY_WEBSOCKET_TRANSPORT_H__ */