lws-meta
authorAndy Green <andy@warmcat.com>
Tue, 18 Jul 2017 20:39:14 +0000 (04:39 +0800)
committerAndy Green <andy@warmcat.com>
Wed, 19 Jul 2017 00:59:42 +0000 (08:59 +0800)
16 files changed:
CMakeLists.txt
README.lws-meta.md [new file with mode: 0644]
lib/libwebsockets.c
lib/libwebsockets.h
lib/output.c
lib/parsers.c
lib/pollfd.c
lib/private-libwebsockets.h
lib/server.c
lib/service.c
lwsws/etc-lwsws-conf.d-localhost-EXAMPLE
plugins/protocol_lws_meta.c [new file with mode: 0644]
test-server/lws-common.js
test-server/test-server-v2.0.c
test-server/test-server.c
test-server/test.html

index 9ad0174..e784ae2 100644 (file)
@@ -1443,6 +1443,8 @@ if (NOT LWS_WITHOUT_TESTAPPS)
                endmacro()
                
 
+               create_plugin(protocol_lws_meta
+                             "plugins/protocol_lws_meta.c" "" "")
                create_plugin(protocol_dumb_increment
                              "plugins/protocol_dumb_increment.c" "" "")
                create_plugin(protocol_lws_mirror
diff --git a/README.lws-meta.md b/README.lws-meta.md
new file mode 100644 (file)
index 0000000..dbca4c0
--- /dev/null
@@ -0,0 +1,192 @@
+# lws-meta protocol
+
+lws-meta is a lightweight ws subprotocol that accepts other ws connections
+to the same server inside it and multiplexes their access to the connection.
+
+```
+  Client                                                    Server
+  conn1: \                                                / :conn1
+  conn2:  =   mux ------ lws-meta ws protocol ----- mux  =  :conn2
+  conn3: /                                                \ :conn3
+```
+
+You may have n client ws connections back to the server, but you now
+only have one tcp connection (and one SSL wrapper if using SSL) instead
+of n of those.
+
+If you currently make multiple ws connections back to the server, so you
+can have different protocols active in one webpage, this if for you.
+
+ - The subprotocol code for the connections inside a lws-meta connection
+   need zero changes from being a normal ws connection.  It is unaware
+   it is inside an lws-meta parent connection.
+
+ - The traffic on the lws-meta connection is indistinguishable from
+   standard ws traffic, so intermediaries won't object to it
+
+ - The multiplexing is done in the protocol, **not by an extension**.  So
+   it's compatible with all browsers.
+
+ - Javascript helper code is provided to very simply use lws-meta
+   protocol instead of direct connections.  The lws test server has
+   been converted to use this by default.
+
+# Converting your server
+
+1) include the provided lws-meta plugin (plugins/protocl_lws_meta.c) as an
+active protocol for your server.  You can do that using runtime plugins, or
+include the plugin sources into your server at build-time.  The lws test
+server uses the latter approach.
+
+That's all you need to do on the server side.
+
+# Converting your browser JS
+
+1) import lws-common.js
+
+2) Instantiate a parent lws-meta connection object
+
+```
+var lws_meta = new lws_meta_ws();
+```
+
+3) Connect the lws-meta object to your server
+
+```
+lws_meta.new_parent(get_appropriate_ws_url("?mirror=" + mirror_name));
+```
+
+4) Convert your actual ws connections to go via the lws_meta object
+
+```
+var my_ws = lws_meta.new_ws("", "dumb-increment-protocol");
+```
+
+The first arg is the URL path, the second arg is the ws protocol you want.
+
+That's it.  my_ws will get `onopen()`, `onmessage()` etc calls as before.
+
+# lws-meta wire protocol
+
+lws-meta works by adding some bytes at the start of a message indicating
+which channel the message applies to.
+
+Channel messages are atomic on the wire.  The reason is if we tried to
+intersperse other channel fragments between one channels message fragments,
+an intermediary would observe violations of the ws framing rule about
+having to start a message with TEXT or BINARY, and use only CONTINUATION
+for the subsequent fragments.  Eg
+
+```
+  [ ch1 TEXT NOFIN ] [ ch2 BINARY FIN ] [ ch1 CONTINUATION FIN ]
+```
+
+is illegal to an observer that doesn't understand lws-meta headers in the
+packet payloads.  So to avoid this situation, only complete messages may
+be sent from one subchannel in each direction at a time.
+
+Consequently, only the first fragment of each message is modified to
+have the extra two bytes identifying the subchannel it is aimed at, since
+the rest of the message from the same subchannel is defined to follow.
+
+If it makes latencies, modify the protocol sending large messages to
+send smaller messages, so the transmission of messages from other channels
+can be sent inbetween the smaller messages.
+
+## lws-meta commands
+
+1) CSTRING indicates a string terminated by 0x00 byte
+
+2) Channel IDs are sent with 0x20 added to them, to guarantee valid UTF-8
+
+### 0x41: RX: LWS_META_CMD_OPEN_SUBCHANNEL
+
+   - CSTRING: protocol name
+   - CSTRING: url
+   - CSTRING: cookie (7 bytes max)
+
+Client is requesting to open a new channel with the given protocol name,
+at the given url.  The cookie (eg, channel name) is only used in
+LWS_META_CMD_OPEN_RESULT, when the channel id is assigned, so it is
+applied to the right channel.
+
+### 0x42: TX: LWS_META_CMD_OPEN_RESULT
+
+   - CSTRING cookie
+   - BYTE channel id (0 indicates failed)
+   - CSTRING: selected protocol name
+
+The server is informing the client of the results of a previous
+open request.  The cookie the client sent to identify the request
+is returned along with a channel id to be used subsequently.  If
+the channel ID is 0 (after subtracting the transport offset of
+0x20) then the open request has failed.
+
+### 0x43: TX: LWS_META_CMD_CLOSE_NOTIFY
+
+   - BYTE channel id
+   - BYTE: payload length + 0x20
+   - BYTE: close code MSB
+   - BYTE: close code LSB
+   - PAYLOAD: payload (< 123 bytes)
+
+Server notifies the client that a child has closed, for whatever reason.
+
+### 0x44: RX: LWS_META_CMD_CLOSE_RQ
+   - BYTE: channel id
+   - BYTE: payload length + 0x20
+   - BYTE: close code MSB
+   - BYTE: close code LSB
+   - PAYLOAD: payload (< 123 bytes)
+
+The client requests to close a child connection
+
+### 0x45: TX: LWS_META_CMD_WRITE
+
+   - BYTE: channel id
+
+Normal write of payload n from lws-meta perspective is actually
+LWS_META_CMD_WRITE, channel id, then (n - 2) bytes of payload
+
+The command only appears at the start of a message, continuations do
+not have the command.
+
+## Protocol Notes
+
+ - Once the subchannel is up, overhead is only +2 bytes per message
+
+ - Close reasons are supported in both directions
+
+ - Ping and Pong are only supported at the lws-meta level, using normal ws ping and pong packets.
+
+ - Only the final close of the tcp lws-meta connection itself goes out as
+   a normal ws close frame.  Subchannels close is done in a normal TEXT
+   message using LWS_META_CMD_CLOSE_RQ and then the close packet payload.
+   This is so intermediaries do not mistake subchannel closures for the
+   tcp / ws link going down.
+
+   Messages that start with LWS_META_CMD_OPEN_SUBCHANNEL only contain those
+   commands but may contain any number of them for the whole duration of the
+   message.  The lws-meta js support collects child open requests made before
+   the parent lws-meta connection is open, and dumps them all in a single
+   message when it does open.
+
+   Messages that start with LWS_META_CMD_OPEN_RESULT or LWS_META_CMD_CLOSE_NOTIFY
+   only contain those two commands, but they may contain any number of them
+   for the whole duration of the message.
+
+
+# Current Implemention Limitations
+
+ - only server side is supported in lws.  The client side JS for
+   a browser is supported.
+
+ - max number of child connections per parent at the moment is 8
+
+ - child connection URL paramter when opening the connection is
+   ignored
+
+ - there is no ah attached when the child connections are
+   established inside the lws-meta parent.  So header access
+   functions will fail.
index 51d8618..5cd0c03 100755 (executable)
@@ -116,6 +116,13 @@ lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs)
        struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
        time_t now;
 
+       if (secs == LWS_TO_KILL_SYNC) {
+               lws_remove_from_timeout_list(wsi);
+               lwsl_debug("synchronously killing %p\n", wsi);
+               lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS);
+               return;
+       }
+
        lws_pt_lock(pt);
 
        time(&now);
@@ -157,6 +164,12 @@ lws_remove_child_from_any_parent(struct lws *wsi)
                if (*pwsi == wsi) {
                        lwsl_info("%s: detach %p from parent %p\n",
                                        __func__, wsi, wsi->parent);
+
+                       if (wsi->parent->protocol)
+                               wsi->parent->protocol->callback(wsi,
+                                               LWS_CALLBACK_CHILD_CLOSING,
+                                              wsi->parent->user_space, wsi, 0);
+
                        *pwsi = wsi->sibling_list;
                        seen = 1;
                        break;
@@ -228,6 +241,8 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
        struct lws_tokens eff_buf;
        int n, m, ret;
 
+       lwsl_debug("%s: %p\n", __func__, wsi);
+
        if (!wsi)
                return;
 
@@ -568,6 +583,8 @@ just_kill_connection:
        /* checking return redundant since we anyway close */
        if (wsi->desc.sockfd != LWS_SOCK_INVALID)
                remove_wsi_socket_from_fds(wsi);
+       else
+               lws_same_vh_protocol_remove(wsi);
 
 #if defined(LWS_WITH_ESP8266)
        espconn_disconnect(wsi->desc.sockfd);
@@ -670,17 +687,18 @@ async_close:
        wsi->socket_is_permanently_unusable = 1;
 
 #ifdef LWS_USE_LIBUV
-       if (LWS_LIBUV_ENABLED(context)) {
-               if (wsi->listener) {
-                       lwsl_debug("%s: stopping listner libuv poll\n", __func__);
-                       uv_poll_stop(&wsi->w_read.uv_watcher);
-               }
-               lwsl_debug("%s: lws_libuv_closehandle: wsi %p\n", __func__, wsi);
-               /* libuv has to do his own close handle processing asynchronously */
-               lws_libuv_closehandle(wsi);
+       if (!wsi->parent_carries_io)
+               if (LWS_LIBUV_ENABLED(context)) {
+                       if (wsi->listener) {
+                               lwsl_debug("%s: stopping listner libuv poll\n", __func__);
+                               uv_poll_stop(&wsi->w_read.uv_watcher);
+                       }
+                       lwsl_debug("%s: lws_libuv_closehandle: wsi %p\n", __func__, wsi);
+                       /* libuv has to do his own close handle processing asynchronously */
+                       lws_libuv_closehandle(wsi);
 
-               return;
-       }
+                       return;
+               }
 #endif
 
        lws_close_free_wsi_final(wsi);
@@ -856,6 +874,9 @@ lws_get_peer_simple(struct lws *wsi, char *name, int namelen)
        int af = AF_INET;
        void *p, *q;
 
+       if (wsi->parent_carries_io)
+               wsi = wsi->parent;
+
 #ifdef LWS_USE_IPV6
        if (LWS_IPV6_ENABLED(wsi->vhost)) {
                len = sizeof(sin6);
@@ -1407,6 +1428,12 @@ lws_is_final_fragment(struct lws *wsi)
        return wsi->u.ws.final && !wsi->u.ws.rx_packet_length && !wsi->u.ws.rx_draining_ext;
 }
 
+LWS_VISIBLE int
+lws_is_first_fragment(struct lws *wsi)
+{
+       return wsi->u.ws.first_fragment;
+}
+
 LWS_VISIBLE unsigned char
 lws_get_reserved_bits(struct lws *wsi)
 {
@@ -1651,6 +1678,48 @@ lws_get_child(const struct lws *wsi)
 }
 
 LWS_VISIBLE LWS_EXTERN void
+lws_set_parent_carries_io(struct lws *wsi)
+{
+       wsi->parent_carries_io = 1;
+}
+
+LWS_VISIBLE LWS_EXTERN void *
+lws_get_opaque_parent_data(const struct lws *wsi)
+{
+       return wsi->opaque_parent_data;
+}
+
+LWS_VISIBLE LWS_EXTERN void
+lws_set_opaque_parent_data(struct lws *wsi, void *data)
+{
+       wsi->opaque_parent_data = data;
+}
+
+LWS_VISIBLE LWS_EXTERN int
+lws_get_child_pending_on_writable(const struct lws *wsi)
+{
+       return wsi->parent_pending_cb_on_writable;
+}
+
+LWS_VISIBLE LWS_EXTERN void
+lws_clear_child_pending_on_writable(struct lws *wsi)
+{
+       wsi->parent_pending_cb_on_writable = 0;
+}
+
+LWS_VISIBLE LWS_EXTERN int
+lws_get_close_length(struct lws *wsi)
+{
+       return wsi->u.ws.close_in_ping_buffer_len;
+}
+
+LWS_VISIBLE LWS_EXTERN unsigned char *
+lws_get_close_payload(struct lws *wsi)
+{
+       return &wsi->u.ws.ping_payload_buf[LWS_PRE];
+}
+
+LWS_VISIBLE LWS_EXTERN void
 lws_close_reason(struct lws *wsi, enum lws_close_status status,
                 unsigned char *buf, size_t len)
 {
index fcd5d99..5e897b1 100644 (file)
@@ -823,6 +823,38 @@ struct lws_context;
 /* needed even with extensions disabled for create context */
 struct lws_extension;
 
+/*! \defgroup lwsmeta lws-meta
+ *
+ * ##lws-meta protocol
+ *
+ * The protocol wraps other muxed connections inside one tcp connection.
+ *
+ * Commands are assigned from 0x41 up (so they are valid unicode)
+ */
+///@{
+
+enum lws_meta_commands {
+       LWS_META_CMD_OPEN_SUBCHANNEL = 'A',
+       /**< Client requests to open new subchannel
+        */
+       LWS_META_CMD_OPEN_RESULT,
+       /**< Result of client request to open new subchannel */
+       LWS_META_CMD_CLOSE_NOTIFY,
+       /**< Notification of subchannel closure */
+       LWS_META_CMD_CLOSE_RQ,
+       /**< client requests to close a subchannel */
+       LWS_META_CMD_WRITE,
+       /**< connection writes something to specific channel index */
+
+       /****** add new things just above ---^ ******/
+};
+
+/* channel numbers are transported offset by 0x20 so they are valid unicode */
+
+#define LWS_META_TRANSPORT_OFFSET 0x20
+
+///@}
+
 /*! \defgroup usercb User Callback
  *
  * ##User protocol callback
@@ -1252,6 +1284,16 @@ enum lws_callback_reasons {
         * using the vhost.  @in is a pointer to a
         * struct lws_ssl_info containing information about the
         * event*/
+       LWS_CALLBACK_CHILD_WRITE_VIA_PARENT                     = 68,
+       /**< Child has been marked with parent_carries_io attribute, so
+        * lws_write directs the to this callback at the parent,
+        * @in is a struct lws_write_passthru containing the args
+        * the lws_write() was called with.
+        */
+       LWS_CALLBACK_CHILD_CLOSING                              = 69,
+       /**< Sent to parent to notify them a child is closing / being
+        * destroyed.  @in is the child wsi.
+        */
 
        /****** add new things just above ---^ ******/
 
@@ -2783,6 +2825,9 @@ lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi)
 /* Backwards compatibility */
 #define lws_plat_service_tsi lws_service_tsi
 
+LWS_VISIBLE LWS_EXTERN int
+lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd);
+
 ///@}
 
 /*! \defgroup http HTTP
@@ -3625,6 +3670,19 @@ enum pending_timeout {
        /****** add new things just above ---^ ******/
 };
 
+#define LWS_TO_KILL_ASYNC -1
+/**< If LWS_TO_KILL_ASYNC is given as the timeout sec in a lws_set_timeout()
+ * call, then the connection is marked to be killed at the next timeout
+ * check.  This is how you should force-close the wsi being serviced if
+ * you are doing it outside the callback (where you should close by nonzero
+ * return).
+ */
+#define LWS_TO_KILL_SYNC -2
+/**< If LWS_TO_KILL_SYNC is given as the timeout sec in a lws_set_timeout()
+ * call, then the connection is closed before returning (which may delete
+ * the wsi).  This should only be used where the wsi being closed is not the
+ * wsi currently being serviced.
+ */
 /**
  * lws_set_timeout() - marks the wsi as subject to a timeout
  *
@@ -3632,7 +3690,10 @@ enum pending_timeout {
  *
  * \param wsi: Websocket connection instance
  * \param reason:      timeout reason
- * \param secs:        how many seconds
+ * \param secs:        how many seconds.  You may set to LWS_TO_KILL_ASYNC to
+ *             force the connection to timeout at the next opportunity, or
+ *             LWS_TO_KILL_SYNC to close it synchronously if you know the
+ *             wsi is not the one currently being serviced.
  */
 LWS_VISIBLE LWS_EXTERN void
 lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs);
@@ -3657,7 +3718,8 @@ lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs);
 #endif
 #define _LWS_PAD(n) (((n) % _LWS_PAD_SIZE) ? \
                ((n) + (_LWS_PAD_SIZE - ((n) % _LWS_PAD_SIZE))) : (n))
-#define LWS_PRE _LWS_PAD(4 + 10)
+/* last 2 is for lws-meta */
+#define LWS_PRE _LWS_PAD(4 + 10 + 2)
 /* used prior to 1.7 and retained for backward compatibility */
 #define LWS_SEND_BUFFER_PRE_PADDING LWS_PRE
 #define LWS_SEND_BUFFER_POST_PADDING 0
@@ -3709,6 +3771,15 @@ enum lws_write_protocol {
         * decode the content if used */
 };
 
+/* used with LWS_CALLBACK_CHILD_WRITE_VIA_PARENT */
+
+struct lws_write_passthru {
+       struct lws *wsi;
+       unsigned char *buf;
+       size_t len;
+       enum lws_write_protocol wp;
+};
+
 
 /**
  * lws_write() - Apply protocol then write data to client
@@ -4053,7 +4124,11 @@ typedef enum {
        LWS_ADOPT_RAW_FILE_DESC = 0,    /* convenience constant */
        LWS_ADOPT_HTTP = 1,             /* flag: absent implies RAW */
        LWS_ADOPT_SOCKET = 2,           /* flag: absent implies file descr */
-       LWS_ADOPT_ALLOW_SSL = 4         /* flag: if set requires LWS_ADOPT_SOCKET */
+       LWS_ADOPT_ALLOW_SSL = 4,        /* flag: if set requires LWS_ADOPT_SOCKET */
+       LWS_ADOPT_WS_PARENTIO = 8,      /* flag: ws mode parent handles IO
+                                        *   if given must be only flag
+                                        *   wsi put directly into ws mode
+                                        */
 } lws_adoption_type;
 
 typedef union {
@@ -4411,6 +4486,32 @@ lws_get_parent(const struct lws *wsi);
 LWS_VISIBLE LWS_EXTERN struct lws * LWS_WARN_UNUSED_RESULT
 lws_get_child(const struct lws *wsi);
 
+/**
+ * lws_parent_carries_io() - mark wsi as needing to send messages via parent
+ *
+ * \param wsi: child lws connection
+ */
+
+LWS_VISIBLE LWS_EXTERN void
+lws_set_parent_carries_io(struct lws *wsi);
+
+LWS_VISIBLE LWS_EXTERN void *
+lws_get_opaque_parent_data(const struct lws *wsi);
+
+LWS_VISIBLE LWS_EXTERN void
+lws_set_opaque_parent_data(struct lws *wsi, void *data);
+
+LWS_VISIBLE LWS_EXTERN int
+lws_get_child_pending_on_writable(const struct lws *wsi);
+
+LWS_VISIBLE LWS_EXTERN void
+lws_clear_child_pending_on_writable(struct lws *wsi);
+
+LWS_VISIBLE LWS_EXTERN int
+lws_get_close_length(struct lws *wsi);
+
+LWS_VISIBLE LWS_EXTERN unsigned char *
+lws_get_close_payload(struct lws *wsi);
 
 /*
  * \deprecated DEPRECATED Note: this is not normally needed as a user api.
@@ -4448,12 +4549,21 @@ lws_send_pipe_choked(struct lws *wsi);
 
 /**
  * lws_is_final_fragment() - tests if last part of ws message
+ *
  * \param wsi: lws connection
  */
 LWS_VISIBLE LWS_EXTERN int
 lws_is_final_fragment(struct lws *wsi);
 
 /**
+ * lws_is_first_fragment() - tests if first part of ws message
+ *
+ * \param wsi: lws connection
+ */
+LWS_VISIBLE LWS_EXTERN int
+lws_is_first_fragment(struct lws *wsi);
+
+/**
  * lws_get_reserved_bits() - access reserved bits of ws frame
  * \param wsi: lws connection
  */
index be0c010..9f72443 100644 (file)
@@ -250,6 +250,23 @@ LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf, size_t len,
        int pre = 0, n;
        size_t orig_len = len;
 
+       if (wsi->parent_carries_io) {
+               struct lws_write_passthru pas;
+
+               pas.buf = buf;
+               pas.len = len;
+               pas.wp = wp;
+               pas.wsi = wsi;
+
+               if (wsi->parent->protocol->callback(wsi->parent,
+                               LWS_CALLBACK_CHILD_WRITE_VIA_PARENT,
+                               wsi->parent->user_space,
+                               (void *)&pas, 0))
+                       return 1;
+
+               return len;
+       }
+
        lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_C_API_LWS_WRITE, 1);
 
        if ((int)len < 0) {
index e41bdf0..172de1d 100644 (file)
@@ -385,6 +385,9 @@ lws_hdr_fragment_length(struct lws *wsi, enum lws_token_indexes h, int frag_idx)
 {
        int n;
 
+       if (!wsi->u.hdr.ah)
+               return 0;
+
        n = wsi->u.hdr.ah->frag_index[h];
        if (!n)
                return 0;
@@ -402,6 +405,9 @@ LWS_VISIBLE int lws_hdr_total_length(struct lws *wsi, enum lws_token_indexes h)
        int n;
        int len = 0;
 
+       if (!wsi->u.hdr.ah)
+               return 0;
+
        n = wsi->u.hdr.ah->frag_index[h];
        if (!n)
                return 0;
@@ -417,7 +423,12 @@ LWS_VISIBLE int lws_hdr_copy_fragment(struct lws *wsi, char *dst, int len,
                                      enum lws_token_indexes h, int frag_idx)
 {
        int n = 0;
-       int f = wsi->u.hdr.ah->frag_index[h];
+       int f;
+
+       if (!wsi->u.hdr.ah)
+               return -1;
+
+       f = wsi->u.hdr.ah->frag_index[h];
 
        if (!f)
                return -1;
@@ -448,6 +459,9 @@ LWS_VISIBLE int lws_hdr_copy(struct lws *wsi, char *dst, int len,
        if (toklen >= len)
                return -1;
 
+       if (!wsi->u.hdr.ah)
+               return -1;
+
        n = wsi->u.hdr.ah->frag_index[h];
        if (!n)
                return 0;
@@ -1145,6 +1159,7 @@ handle_first:
                        wsi->u.ws.rsv_first_msg = (c & 0x70);
                        wsi->u.ws.frame_is_binary =
                             wsi->u.ws.opcode == LWSWSOPC_BINARY_FRAME;
+                       wsi->u.ws.first_fragment = 1;
                        break;
                case 3:
                case 4:
@@ -1492,6 +1507,7 @@ drain_extension:
                /* eff_buf may be pointing somewhere completely different now,
                 * it's the output
                 */
+               wsi->u.ws.first_fragment = 0;
                if (n < 0) {
                        /*
                         * we may rely on this to get RX, just drop connection
index 7c3a0ef..f3f2fbb 100644 (file)
@@ -236,6 +236,11 @@ remove_wsi_socket_from_fds(struct lws *wsi)
 #endif
        int m, ret = 0;
 
+       if (wsi->parent_carries_io) {
+               lws_same_vh_protocol_remove(wsi);
+               return 0;
+       }
+
 #if !defined(_WIN32) && !defined(LWS_WITH_ESP8266)
        if (wsi->desc.sockfd > context->max_fds) {
                lwsl_err("fd %d too high (%d)\n", wsi->desc.sockfd, context->max_fds);
@@ -348,6 +353,16 @@ lws_callback_on_writable(struct lws *wsi)
        if (wsi->socket_is_permanently_unusable)
                return 0;
 
+       if (wsi->parent_carries_io) {
+               int n = lws_callback_on_writable(wsi->parent);
+
+               if (n < 0)
+                       return n;
+
+               wsi->parent_pending_cb_on_writable = 1;
+               return 1;
+       }
+
        pt = &wsi->context->pt[(int)wsi->tsi];
        lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_C_WRITEABLE_CB_REQ, 1);
 #if defined(LWS_WITH_STATS)
index 1c903d7..a527661 100644 (file)
@@ -1484,6 +1484,7 @@ struct _lws_websocket_related {
        unsigned int rx_draining_ext:1;
        unsigned int tx_draining_ext:1;
        unsigned int send_check_ping:1;
+       unsigned int first_fragment:1;
 };
 
 #ifdef LWS_WITH_CGI
@@ -1588,6 +1589,7 @@ struct lws {
        struct lws_access_log access_log;
 #endif
        void *user_space;
+       void *opaque_parent_data;
        /* rxflow handling */
        unsigned char *rxflow_buffer;
        /* truncated send handling */
@@ -1651,6 +1653,8 @@ struct lws {
        unsigned int told_user_closed:1;
        unsigned int waiting_to_send_close_frame:1;
        unsigned int ipv6:1;
+       unsigned int parent_carries_io:1;
+       unsigned int parent_pending_cb_on_writable:1;
 
 #if defined(LWS_WITH_ESP8266)
        unsigned int pending_send_completion:3;
index 78b2221..668f6f1 100644 (file)
@@ -1284,6 +1284,57 @@ transaction_result_n:
 #endif
 }
 
+static int
+lws_server_init_wsi_for_ws(struct lws *wsi)
+{
+       int n;
+
+       wsi->state = LWSS_ESTABLISHED;
+       lws_restart_ws_ping_pong_timer(wsi);
+
+       /*
+        * create the frame buffer for this connection according to the
+        * size mentioned in the protocol definition.  If 0 there, use
+        * a big default for compatibility
+        */
+
+       n = wsi->protocol->rx_buffer_size;
+       if (!n)
+               n = wsi->context->pt_serv_buf_size;
+       n += LWS_PRE;
+       wsi->u.ws.rx_ubuf = lws_malloc(n + 4 /* 0x0000ffff zlib */);
+       if (!wsi->u.ws.rx_ubuf) {
+               lwsl_err("Out of Mem allocating rx buffer %d\n", n);
+               return 1;
+       }
+       wsi->u.ws.rx_ubuf_alloc = n;
+       lwsl_debug("Allocating RX buffer %d\n", n);
+
+#if LWS_POSIX && !defined(LWS_WITH_ESP32)
+       if (!wsi->parent_carries_io)
+               if (setsockopt(wsi->desc.sockfd, SOL_SOCKET, SO_SNDBUF,
+                      (const char *)&n, sizeof n)) {
+                       lwsl_warn("Failed to set SNDBUF to %d", n);
+                       return 1;
+               }
+#endif
+
+       /* notify user code that we're ready to roll */
+
+       if (wsi->protocol->callback)
+               if (wsi->protocol->callback(wsi, LWS_CALLBACK_ESTABLISHED,
+                                           wsi->user_space,
+#ifdef LWS_OPENSSL_SUPPORT
+                                           wsi->ssl,
+#else
+                                           NULL,
+#endif
+                                           0))
+                       return 1;
+
+       return 0;
+}
+
 int
 lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len)
 {
@@ -1652,49 +1703,10 @@ upgrade_ws:
                wsi->u.hdr = hdr;
                lws_pt_unlock(pt);
 
-               lws_restart_ws_ping_pong_timer(wsi);
-
-               /*
-                * create the frame buffer for this connection according to the
-                * size mentioned in the protocol definition.  If 0 there, use
-                * a big default for compatibility
-                */
-
-               n = wsi->protocol->rx_buffer_size;
-               if (!n)
-                       n = context->pt_serv_buf_size;
-               n += LWS_PRE;
-               wsi->u.ws.rx_ubuf = lws_malloc(n + 4 /* 0x0000ffff zlib */);
-               if (!wsi->u.ws.rx_ubuf) {
-                       lwsl_err("Out of Mem allocating rx buffer %d\n", n);
-                       return 1;
-               }
-               wsi->u.ws.rx_ubuf_alloc = n;
-               lwsl_debug("Allocating RX buffer %d\n", n);
-#if LWS_POSIX && !defined(LWS_WITH_ESP32)
-               if (setsockopt(wsi->desc.sockfd, SOL_SOCKET, SO_SNDBUF,
-                              (const char *)&n, sizeof n)) {
-                       lwsl_warn("Failed to set SNDBUF to %d", n);
-                       return 1;
-               }
-#endif
-
+               lws_server_init_wsi_for_ws(wsi);
                lwsl_parser("accepted v%02d connection\n",
                            wsi->ietf_spec_revision);
 
-               /* notify user code that we're ready to roll */
-
-               if (wsi->protocol->callback)
-                       if (wsi->protocol->callback(wsi, LWS_CALLBACK_ESTABLISHED,
-                                                   wsi->user_space,
-#ifdef LWS_OPENSSL_SUPPORT
-                                                   wsi->ssl,
-#else
-                                                   NULL,
-#endif
-                                                   0))
-                               return 1;
-
                /* !!! drop ah unreservedly after ESTABLISHED */
                if (!wsi->more_rx_waiting) {
                        lws_header_table_force_to_detachable_state(wsi);
@@ -1782,6 +1794,8 @@ lws_create_new_server_wsi(struct lws_vhost *vhost)
        new_wsi->user_space = NULL;
        new_wsi->ietf_spec_revision = 0;
        new_wsi->desc.sockfd = LWS_SOCK_INVALID;
+       new_wsi->position_in_fds_table = -1;
+
        vhost->context->count_wsi_allocated++;
 
        /*
@@ -1889,7 +1903,7 @@ lws_adopt_descriptor_vhost(struct lws_vhost *vh, lws_adoption_type type,
        int n, ssl = 0;
 
        if (!new_wsi) {
-               if (type & LWS_ADOPT_SOCKET)
+               if (type & LWS_ADOPT_SOCKET && !(type & LWS_ADOPT_WS_PARENTIO))
                        compatible_close(fd.sockfd);
                return NULL;
        }
@@ -1900,6 +1914,9 @@ lws_adopt_descriptor_vhost(struct lws_vhost *vh, lws_adoption_type type,
                new_wsi->parent = parent;
                new_wsi->sibling_list = parent->child_list;
                parent->child_list = new_wsi;
+
+               if (type & LWS_ADOPT_WS_PARENTIO)
+                       new_wsi->parent_carries_io = 1;
        }
 
        new_wsi->desc = fd;
@@ -1916,6 +1933,15 @@ lws_adopt_descriptor_vhost(struct lws_vhost *vh, lws_adoption_type type,
                        lwsl_notice("OOM trying to get user_space\n");
                        goto bail;
                }
+               if (type & LWS_ADOPT_WS_PARENTIO) {
+                       new_wsi->desc.sockfd = LWS_SOCK_INVALID;
+                       lwsl_debug("binding to %s\n", new_wsi->protocol->name);
+                       lws_bind_protocol(new_wsi, new_wsi->protocol);
+                       lws_union_transition(new_wsi, LWSCM_WS_SERVING);
+                       lws_server_init_wsi_for_ws(new_wsi);
+
+                       return new_wsi;
+               }
        } else
                if (type & LWS_ADOPT_HTTP) /* he will transition later */
                        new_wsi->protocol =
index 65868e0..2f703f8 100644 (file)
@@ -329,6 +329,13 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
 user_service:
        /* one shot */
 
+       if (wsi->parent_carries_io) {
+               wsi->handling_pollout = 0;
+               wsi->leave_pollout_active = 0;
+
+               return lws_calllback_as_writeable(wsi);
+       }
+
        if (pollfd) {
                int eff = wsi->leave_pollout_active;
 
index 42ac275..2aa85e5 100644 (file)
@@ -32,6 +32,9 @@
      # vhost-specific config options for the protocol
      #
      "ws-protocols": [{
+       "lws-meta": {
+         "status": "ok"
+       },
        "dumb-increment-protocol": {
          "status": "ok"
        },
diff --git a/plugins/protocol_lws_meta.c b/plugins/protocol_lws_meta.c
new file mode 100644 (file)
index 0000000..f20b933
--- /dev/null
@@ -0,0 +1,614 @@
+/*
+ * lws meta protocol handler
+ *
+ * Copyright (C) 2017 Andy Green <andy@warmcat.com>
+ *
+ *  This library is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public
+ *  License as published by the Free Software Foundation:
+ *  version 2.1 of the License.
+ *
+ *  This library is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this library; if not, write to the Free Software
+ *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ *  MA  02110-1301  USA
+ *
+ */
+
+#if !defined (LWS_PLUGIN_STATIC)
+#define LWS_DLL
+#define LWS_INTERNAL
+#include "../lib/libwebsockets.h"
+#endif
+
+#include <string.h>
+#include <stdlib.h>
+
+#define MAX_SUBCHANNELS 8
+
+enum lws_meta_parser_state {
+       MP_IDLE, /* in body of message */
+
+       MP_CMD, /* await cmd */
+
+       MP_OPEN_SUBCHANNEL_PROTOCOL,
+       MP_OPEN_SUBCHANNEL_URL,
+       MP_OPEN_SUBCHANNEL_COOKIE,
+
+       MP_CLOSE_CHID,
+       MP_CLOSE_LEN,
+       MP_CLOSE_CODEM,
+       MP_CLOSE_CODEL,
+       MP_CLOSE_PAYLOAD,
+
+       MP_WRITE_CHID,
+};
+
+enum {
+       PENDING_TYPE_OPEN_RESULT = 0,
+       PENDING_TYPE_CHILD_CLOSE
+};
+
+/*
+ * while we haven't reported the result yet, we keep a linked-list of
+ * connection opens and their result.
+ */
+struct pending_conn {
+       struct pending_conn *next;
+       char protocol[123];
+       char cookie[8];
+       int ch;
+       int len;
+
+       unsigned char type;
+};
+
+/*
+ * the parent, lws-meta connection
+ */
+struct per_session_data__lws_meta {
+       struct lws *wsi[MAX_SUBCHANNELS + 1];
+       char told_closing[MAX_SUBCHANNELS + 1];
+       struct pending_conn *first;
+       struct pending_conn *pend;
+       char suburl[64];
+       unsigned char close[126];
+       int active_subchannel_tx, active_subchannel_rx;
+       enum lws_meta_parser_state state;
+       int pos;
+       int count_pending;
+       int round_robin;
+       int close_status_16;
+       int close_len;
+       int which_close;
+       int ch;
+};
+
+static int
+lws_find_free_channel(struct per_session_data__lws_meta *pss)
+{
+       int n;
+
+       for (n = 1; n <= MAX_SUBCHANNELS; n++)
+               if (pss->wsi[n] == NULL)
+                       return n;
+
+       return 0; /* none free */
+}
+
+static struct lws *
+lws_get_channel_wsi(struct per_session_data__lws_meta *pss, int ch)
+{
+       if (!ch)
+               return 0;
+       return pss->wsi[ch];
+}
+
+static int
+lws_get_channel_id(struct lws *wsi)
+{
+       return (lws_intptr_t)lws_get_opaque_parent_data(wsi);
+}
+
+static void
+lws_set_channel_id(struct lws *wsi, int id)
+{
+       lws_set_opaque_parent_data(wsi, (void *)(lws_intptr_t)id);
+}
+
+static struct pending_conn *
+new_pending(struct per_session_data__lws_meta *pss)
+{
+       struct pending_conn *pend;
+
+       if (pss->count_pending >= MAX_SUBCHANNELS * 2) {
+               lwsl_notice("too many pending open subchannel\n");
+
+               return NULL;
+       }
+
+       pss->count_pending++;
+
+       pend = malloc(sizeof(*pend));
+       if (!pend) {
+               lwsl_notice("OOM\n");
+
+               return NULL;
+       }
+
+       memset(pend, 0, sizeof(*pend));
+
+       return pend;
+}
+
+static int
+callback_lws_meta(struct lws *wsi, enum lws_callback_reasons reason,
+                   void *user, void *in, size_t len)
+{
+       struct per_session_data__lws_meta *pss =
+                       (struct per_session_data__lws_meta *)user;
+       struct lws_write_passthru *pas;
+       struct pending_conn *pend, *pend1;
+       struct lws *cwsi;
+       lws_sock_file_fd_type fd;
+       unsigned char *bin, buf[LWS_PRE + 512], *start = &buf[LWS_PRE],
+                       *end = &buf[sizeof(buf) - 1], *p = start;
+       int n, m;
+
+       switch (reason) {
+
+       case LWS_CALLBACK_ESTABLISHED:
+               lwsl_info("%s: LWS_CALLBACK_ESTABLISHED\n", __func__);
+               pss->state = MP_CMD;
+               pss->pos = 0;
+               break;
+
+       case LWS_CALLBACK_CLOSED:
+               break;
+
+       case LWS_CALLBACK_CHILD_CLOSING:
+               cwsi = (struct lws *)in;
+
+               /* remove it from our tracking */
+               pss->wsi[lws_get_channel_id(cwsi)] = NULL;
+
+               if (pss->told_closing[lws_get_channel_id(cwsi)]) {
+                       pss->told_closing[lws_get_channel_id(cwsi)] = 0;
+                       break;
+               }
+
+               pend = new_pending(pss);
+               if (!pend)
+                       return -1;
+
+               /* note which channel id */
+               pend->ch = lws_get_channel_id(cwsi);
+
+               if (lws_get_close_length(cwsi)) {
+                       pend->len = lws_get_close_length(cwsi);
+                       memcpy(pend->protocol, lws_get_close_payload(cwsi),
+                                       pend->len);
+               }
+
+               pend->type = PENDING_TYPE_CHILD_CLOSE;
+               pend->next = pss->first;
+               pss->first = pend;
+
+               /*
+                * nothing else will complete from this wsi, so abandon
+                * tracking in-process messages from this wsi.
+                */
+
+               if (pss->active_subchannel_tx == pend->ch)
+                       pss->active_subchannel_tx = 0;
+
+               if (pss->active_subchannel_rx == pend->ch)
+                       pss->active_subchannel_rx = 0;
+               break;
+
+       case LWS_CALLBACK_SERVER_WRITEABLE:
+
+               if (!pss->active_subchannel_tx) {
+
+                       /* not in the middle of a message...
+                        *
+                        * PRIORITY 1: pending open and close notifications
+                        */
+
+                       pend = pss->first;
+                       while (pend && p < end - 128) {
+                               switch (pend->type) {
+                               case PENDING_TYPE_OPEN_RESULT:
+                                       lwsl_debug("open result %s %s\n",
+                                               pend->cookie, pend->protocol);
+                                       *p++ = LWS_META_CMD_OPEN_RESULT;
+                                       memcpy(p, pend->cookie,
+                                              strlen(pend->cookie) + 1);
+                                       p += strlen(pend->cookie) + 1;
+                                       *p++ = LWS_META_TRANSPORT_OFFSET +
+                                                       pend->ch;
+                                       memcpy(p, pend->protocol,
+                                              strlen(pend->protocol) + 1);
+                                       p += strlen(pend->protocol) + 1;
+                                       break;
+                               case PENDING_TYPE_CHILD_CLOSE:
+                                       *p++ = LWS_META_CMD_CLOSE_NOTIFY;
+                                       *p++ = LWS_META_TRANSPORT_OFFSET +
+                                                       pend->ch;
+                                       for (n = 0; n < pend->len; n++)
+                                               *p++ = pend->protocol[n];
+                                       break;
+                               }
+
+                               pss->count_pending--;
+                               pend1 = pend;
+                               pend = pend->next;
+                               free(pend1);
+                               pss->first = pend;
+                       }
+
+                       if (p != start) {
+                               if (lws_write(wsi, start, p - start,
+                                             LWS_WRITE_BINARY) < 0)
+                                       return 1;
+                               if (pend) /* still more */
+                                       lws_callback_on_writable(wsi);
+                               break;
+                       }
+
+                       /* PRIORITY 2: pick a child for the writable callback */
+
+                       cwsi = NULL;
+                       for (n = 0; n < MAX_SUBCHANNELS; n++) {
+                               m = ((pss->round_robin + n) % MAX_SUBCHANNELS) + 1;
+                               if (pss->wsi[m] &&
+                                   lws_get_child_pending_on_writable(pss->wsi[m])) {
+                                       pss->round_robin = m;
+                                       cwsi = pss->wsi[m];
+                                       break;
+                               }
+                       }
+               } else
+                       /* one child is in middle of message, stay with it */
+                       cwsi = pss->wsi[pss->active_subchannel_tx];
+
+               if (!cwsi)
+                       break;
+
+               lws_clear_child_pending_on_writable(cwsi);
+               if (lws_handle_POLLOUT_event(cwsi, NULL))
+                       return -1;
+               break;
+
+       case LWS_CALLBACK_RECEIVE:
+               bin = (unsigned char *)in;
+
+               /*
+                * at the start of a message, we may have one or more
+                * lws_meta command blocks.
+                */
+               while (pss->state != MP_IDLE &&
+                      (unsigned int)(bin - (unsigned char *)in) < len) {
+
+                       switch (pss->state) {
+                       case MP_IDLE: /* in body of message */
+
+                               if (!lws_is_first_fragment(wsi))
+                                       break;
+
+                               pss->state = MP_CMD;
+
+                               /* fallthru */
+
+                       case MP_CMD: /* await cmd */
+
+                               pss->pos = 0;
+
+                               switch (*bin++) {
+                               case LWS_META_CMD_OPEN_SUBCHANNEL:
+
+                                       pss->pend = new_pending(pss);
+                                       if (!pss->pend)
+                                               return -1;
+
+                                       pss->state = MP_OPEN_SUBCHANNEL_PROTOCOL;
+
+                                       break;
+                               case LWS_META_CMD_CLOSE_NOTIFY:
+                               case LWS_META_CMD_CLOSE_RQ:
+                                       pss->which_close = bin[-1];
+                                       pss->state = MP_CLOSE_CHID;
+                                       break;
+                               case LWS_META_CMD_WRITE:
+                                       pss->state = MP_WRITE_CHID;
+                                       break;
+
+                               // open result is also illegal to receive
+                               default:
+                                       lwsl_notice("bad lws_meta cmd 0x%x\n",
+                                                   bin[-1]);
+
+                                       return -1;
+                               }
+
+                               break;
+
+                       case MP_OPEN_SUBCHANNEL_PROTOCOL:
+                               pss->pend->protocol[pss->pos++] = *bin++;
+                               if (pss->pos == sizeof(pss->pend->protocol) - 1) {
+                                       lwsl_notice("protocol name too long\n");
+                                       return -1;
+                               }
+
+                               if (bin[-1] != '\0')
+                                       break;
+
+                               pss->state = MP_OPEN_SUBCHANNEL_URL;
+                               pss->pos = 0;
+                               break;
+
+                       case MP_OPEN_SUBCHANNEL_URL:
+                               pss->suburl[pss->pos++] = *bin++;
+                               if (pss->pos == sizeof(pss->suburl) - 1) {
+                                       lwsl_notice("suburl too long\n");
+                                       return -1;
+                               }
+
+                               if (bin[-1] != '\0')
+                                       break;
+
+                               pss->state = MP_OPEN_SUBCHANNEL_COOKIE;
+                               pss->pos = 0;
+                               break;
+
+                       case MP_OPEN_SUBCHANNEL_COOKIE:
+                               pss->pend->cookie[pss->pos++] = *bin++;
+                               if (pss->pos == sizeof(pss->pend->cookie) - 1) {
+                                       lwsl_notice("cookie too long\n");
+                                       return -1;
+                               }
+
+                               if (bin[-1] != '\0')
+                                       break;
+
+                               lwsl_debug("%s: %s / %s / %s\n", __func__,
+                                           pss->pend->protocol,
+                                           pss->suburl,
+                                           pss->pend->cookie);
+
+                               pss->pend->ch = lws_find_free_channel(pss);
+                               if (pss->pend->ch) {
+
+                                       fd.sockfd = 0; // not going to be used
+
+                                       cwsi = lws_adopt_descriptor_vhost(
+                                                       lws_get_vhost(wsi),
+                                                       LWS_ADOPT_WS_PARENTIO,
+                                                       fd, pss->pend->protocol,
+                                                       wsi);
+
+                                       if (!cwsi) {
+                                               lwsl_notice("open failed\n");
+                                               pss->pend->ch = 0;
+                                       } else {
+                                               pss->wsi[pss->pend->ch] = cwsi;
+                                               lws_set_channel_id(cwsi,
+                                                               pss->pend->ch);
+                                               lwsl_debug("cwsi %p on parent %p open OK %s\n",
+                                                       cwsi, wsi, pss->pend->protocol);
+                                       }
+
+                               } else
+                                       lwsl_notice("no free subchannels\n");
+
+                               pss->pend->type = PENDING_TYPE_OPEN_RESULT;
+                               pss->pend->next = pss->first;
+                               pss->first = pss->pend;
+
+                               lws_callback_on_writable(wsi);
+
+                               pss->state = MP_CMD;
+                               pss->pos = 0;
+                               break;
+
+                       case MP_CLOSE_CHID:
+                               pss->ch = (*bin++) - LWS_META_TRANSPORT_OFFSET;
+                               pss->state = MP_CLOSE_LEN;
+                               pss->pos = 0;
+                               break;
+                       case MP_CLOSE_LEN:
+                               pss->close_len = (*bin++) -
+                                       LWS_META_TRANSPORT_OFFSET;
+                               lwsl_debug("close len %d\n", pss->close_len);
+                               pss->state = MP_CLOSE_CODEM;
+                               pss->pos = 0;
+                               break;
+                       case MP_CLOSE_CODEM:
+                               pss->close[pss->pos++] = *bin;
+                               pss->close_status_16 = (*bin++) * 256;
+                               pss->state = MP_CLOSE_CODEL;
+                               break;
+                       case MP_CLOSE_CODEL:
+                               pss->close[pss->pos++] = *bin;
+                               pss->close_status_16 |= *bin++;
+                               pss->state = MP_CLOSE_PAYLOAD;
+                               break;
+                       case MP_CLOSE_PAYLOAD:
+                               pss->close[pss->pos++] = *bin++;
+                               if (pss->pos == sizeof(pss->close) - 1) {
+                                       lwsl_notice("close payload too long\n");
+                                       return -1;
+                               }
+                               if (--pss->close_len)
+                                       break;
+
+                               pss->state = MP_CMD;
+
+                               cwsi = lws_get_channel_wsi(pss, pss->ch);
+                               if (!cwsi) {
+                                       lwsl_notice("close (%d) bad ch %d\n",
+                                               pss->which_close, pss->ch);
+                                       break;
+                               }
+
+                               if (pss->which_close == LWS_META_CMD_CLOSE_RQ) {
+                                       if (lws_get_protocol(cwsi)->callback(
+                                           cwsi,
+                                           LWS_CALLBACK_WS_PEER_INITIATED_CLOSE,
+                                           lws_wsi_user(cwsi), &pss->close,
+                                           pss->pos))
+                                               return -1;
+
+                                       /*
+                                        * we need to echo back the close payload
+                                        * when we send the close notification
+                                        */
+                                       lws_close_reason(cwsi,
+                                                        pss->close_status_16,
+                                                        &pss->close[2],
+                                                        pss->pos - 2);
+                               }
+
+                               /* so force him closed */
+
+                               lws_set_timeout(cwsi,
+                                       PENDING_TIMEOUT_KILLED_BY_PARENT,
+                                       LWS_TO_KILL_SYNC);
+                               break;
+
+                       case MP_WRITE_CHID:
+                               pss->active_subchannel_rx = (*bin++) -
+                                       LWS_META_TRANSPORT_OFFSET;
+                               pss->state = MP_IDLE;
+                               break;
+                       }
+               }
+
+               len -= bin - (unsigned char *)in;
+
+               if (!len)
+                       break;
+
+               cwsi = lws_get_channel_wsi(pss, pss->active_subchannel_rx);
+               if (!cwsi) {
+                       lwsl_notice("bad ch %d\n", pss->active_subchannel_rx);
+
+                       return -1;
+               }
+
+               lwsl_debug("%s: RX len %d\n", __func__, (int)len);
+
+               if (lws_get_protocol(cwsi)->callback(cwsi,
+                                       LWS_CALLBACK_RECEIVE,
+                                       lws_wsi_user(cwsi), bin, len))
+                       lws_set_timeout(cwsi,
+                               PENDING_TIMEOUT_KILLED_BY_PARENT,
+                               LWS_TO_KILL_SYNC);
+
+               if (lws_is_final_fragment(wsi)) {
+                       pss->active_subchannel_rx = 0;
+                       pss->state = MP_CMD;
+               }
+               break;
+
+       /*
+        * child wrote something via lws_write.... which passed it up to us to
+        * deal with, because we are the parent.  Prepend two bytes for
+        * lws-meta command and channel index, and send it out on parent
+        */
+       case LWS_CALLBACK_CHILD_WRITE_VIA_PARENT:
+               pas = in;
+               bin = ((unsigned char *)pas->buf);
+
+               if ((pas->wp & 7) == 4 /*LWS_WRITE_CLOSE */) {
+                       *p++ = LWS_META_CMD_CLOSE_NOTIFY;
+                       *p++ = LWS_META_TRANSPORT_OFFSET +
+                                       lws_get_channel_id(pas->wsi);
+                       *p++ = pas->len - 2 + LWS_META_TRANSPORT_OFFSET;
+                       *p++ = *bin++;
+                       *p++ = *bin++;
+                       for (n = 0; n < (int)pas->len - 2; n++)
+                               *p++ = bin[n];
+
+                       if (lws_write(wsi, start, p - start,
+                                     LWS_WRITE_BINARY) < 0)
+                               return 1;
+
+                       pss->told_closing[lws_get_channel_id(pas->wsi)] = 1;
+                       break;
+               }
+
+               if ((pas->wp & 7) == LWS_WRITE_TEXT ||
+                   (pas->wp & 7) == LWS_WRITE_BINARY) {
+
+                       if (pas->wp & LWS_WRITE_NO_FIN)
+                               pss->active_subchannel_tx =
+                                               lws_get_channel_id(pas->wsi);
+
+                       /* start of message, prepend the subchannel id */
+
+                       bin -= 2;
+                       bin[0] = LWS_META_CMD_WRITE;
+                       bin[1] = lws_get_channel_id(pas->wsi) +
+                                       LWS_META_TRANSPORT_OFFSET;
+                       if (lws_write(wsi, bin, pas->len + 2, pas->wp) < 0)
+                               return 1;
+               } else
+                       if (lws_write(wsi, bin, pas->len, pas->wp) < 0)
+                               return 1;
+
+               /* track EOM */
+
+               if (!(pas->wp & LWS_WRITE_NO_FIN))
+                       pss->active_subchannel_tx = 0;
+               break;
+
+       default:
+               break;
+       }
+
+       return 0;
+}
+
+#define LWS_PLUGIN_PROTOCOL_LWS_META { \
+               "lws-meta", \
+               callback_lws_meta, \
+               sizeof(struct per_session_data__lws_meta), \
+               1024, /* rx buf size must be >= permessage-deflate rx size */ \
+       }
+
+#if !defined (LWS_PLUGIN_STATIC)
+
+static const struct lws_protocols protocols[] = {
+       LWS_PLUGIN_PROTOCOL_LWS_META
+};
+
+LWS_EXTERN LWS_VISIBLE int
+init_protocol_lws_meta(struct lws_context *context,
+                            struct lws_plugin_capability *c)
+{
+       if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
+               lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
+                        c->api_magic);
+               return 1;
+       }
+
+       c->protocols = protocols;
+       c->count_protocols = ARRAY_SIZE(protocols);
+       c->extensions = NULL;
+       c->count_extensions = 0;
+
+       return 0;
+}
+
+LWS_EXTERN LWS_VISIBLE int
+destroy_protocol_lws_meta(struct lws_context *context)
+{
+       return 0;
+}
+#endif
index 917aedf..1e94eaa 100644 (file)
@@ -108,6 +108,285 @@ function removeEvent( obj, type, fn ) {
  * end of grayOut related stuff
  */
  
+/*
+ * lws-meta helpers
+ */
+
+var lws_meta_cmd = {
+       OPEN_SUBCHANNEL: 0x41,
+       /**< Client requests to open new subchannel
+        */
+       OPEN_RESULT: 0x42,
+       /**< Result of client request to open new subchannel */
+       CLOSE_NOT: 0x43,
+       CLOSE_RQ: 0x44,
+       /**< client requests to close a subchannel */
+       WRITE: 0x45,
+       /**< connection writes something to specific channel index */
+       RX: 0x46,
+};
+
+function new_ws(urlpath, protocol)
+{
+       if (typeof MozWebSocket != "undefined")
+               return new MozWebSocket(urlpath, protocol);
+
+       return new WebSocket(urlpath, protocol);
+}
+
+function lws_meta_ws() {
+       var real;
+       
+       var channel_id_to_child;
+       var pending_children;
+       var active_children;
+}
+
+function lws_meta_ws_child() {
+       var onopen;
+       var onmessage;
+       var onclose;
+       
+       var channel_id;
+       
+       var subprotocol;
+       var suburl;
+       var cookie;
+       
+       var extensions;
+       
+       var parent;
+}
+
+lws_meta_ws_child.prototype.send = function(data)
+{
+
+       if (typeof data == "string") {
+               data = String.fromCharCode(lws_meta_cmd.WRITE) +
+                       String.fromCharCode(this.channel_id) +
+                       data;
+               
+               return this.parent.real.send(data);
+       }
+       
+       {
+
+               var ab = new Uint8Array(data.length + 2);
+
+               ab[0] = lws_meta_cmd.WRITE;
+               ab[1] = this.channel_id;
+               ab.set(data, 2);
+       
+               return this.parent.real.send(ab);
+       }
+}
+
+lws_meta_ws_child.prototype.close = function(close_code, close_string)
+{
+       var pkt = new Uint8Array(129), m = 0, pkt1;
+       
+       pkt[m++] = lws_meta_cmd.CLOSE_RQ;
+       pkt[m++] = this.channel_id;
+       
+       pkt[m++] = close_string.length + 0x20;
+       
+       pkt[m++] = close_code / 256;
+       pkt[m++] = close_code % 256;
+       
+       for (i = 0; i < close_string.length; i++)
+               pkt[m++] = close_string.charCodeAt(i);
+       
+       pkt1 = new Uint8Array(m);
+       for (n = 0; n < m; n++)
+               pkt1[n] = pkt[n];
+               
+       this.parent.real.send(pkt1.buffer);
+}
+
+/* make a real ws connection using lws_meta*/
+lws_meta_ws.prototype.new_parent = function(urlpath)
+{
+       var n, i, m = 0, pkt1;
+       
+       this.ordinal = 1;
+       this.pending_children = [];
+       this.active_children = [];
+       this.real = new_ws(urlpath, "lws-meta");
+       
+       this.real.binaryType = 'arraybuffer';
+       this.real.myparent = this;
+
+       this.real.onopen = function() {
+               pkt = new Uint8Array(1024);
+                       var n, i, m = 0, pkt1;
+               console.log("real open - pending children " + this.myparent.pending_children.length);
+               for (n = 0; n < this.myparent.pending_children.length; n++) {
+               
+                       var p = this.myparent.pending_children[n];
+               
+                       pkt[m++] = lws_meta_cmd.OPEN_SUBCHANNEL;
+                       for (i = 0; i < p.subprotocol.length; i++)
+                               pkt[m++] = p.subprotocol.charCodeAt(i);
+                       pkt[m++] = 0;
+                       for (i = 0; i < p.suburl.length; i++)
+                               pkt[m++] = p.suburl.charCodeAt(i);
+                       pkt[m++] = 0;
+                       for (i = 0; i < p.cookie.length; i++)
+                               pkt[m++] = p.cookie.charCodeAt(i);
+                       pkt[m++] = 0;
+               }
+               
+               pkt1 = new Uint8Array(m);
+               for (n = 0; n < m; n++)
+                       pkt1[n] = pkt[n];
+               
+               console.log(this.myparent.pending_children[0].subprotocol);
+               console.log(pkt1);
+               
+               this.send(pkt1.buffer);
+       }
+
+
+       this.real.onmessage = function(msg) {
+       
+               if (typeof msg.data != "string") {
+                       var ba = new Uint8Array(msg.data), n = 0;
+                       
+                       while (n < ba.length) {
+
+                               switch (ba[n++]) {
+                               case lws_meta_cmd.OPEN_RESULT:
+                               {
+                                       var m = 0, cookie = "", protocol = "", ch = 0;
+                                       var ws = this.myparent;
+                                       /* cookie NUL
+                                        * channel index + 0x20
+                                        * protocol NUL
+                                        */
+                                        while (ba[n])
+                                               cookie = cookie + String.fromCharCode(ba[n++]);
+                                        n++;
+                                        ch = ba[n++];
+                                        
+                                        while (ba[n])
+                                               protocol = protocol + String.fromCharCode(ba[n++]);
+                                               
+                                       console.log("open result " + cookie + " " + protocol + " " + ch + " pending len " + ws.pending_children.length);
+                                       
+                                       for (m = 0; m < ws.pending_children.length; m++) {
+                                               if (ws.pending_children[m].cookie == cookie) {
+                                                       var newchild = ws.pending_children[m];
+                       
+                                                       /* found it */
+                                                       ws.pending_children[m].channel_id = ch;
+                                                       /* add to active children array */
+                                                       ws.active_children.push(ws.pending_children[m]);
+                                                       /* remove from pending children array */
+                                                       ws.pending_children.splice(m, 1);
+                                                       
+                                                       newchild.parent = ws;
+                                                       newchild.extensions = this.extensions;
+                                                       
+                                                       newchild.onopen();
+                                                       
+                                                       console.log("made active " + cookie);
+                                                       break;
+                                               }
+                                       }
+                                       break;
+                               }
+       
+                               case lws_meta_cmd.CLOSE_NOT:
+                               {
+                                       var code = 0, str = "", ch = 0, m, le;
+                                       var ba = new Uint8Array(msg.data);
+                                       /*
+                                        * BYTE: channel
+                                        * BYTE: MSB status code
+                                        * BYTE: LSB status code
+                                        * BYTES: rest of message is close status string
+                                        */
+                                        
+                                        ch = ba[n++];
+                                        le = ba[n++] - 0x20;
+                                        code = ba[n++] * 256;
+                                        code += ba[n++];
+                                        
+                                        while (le--)
+                                               str += String.fromCharCode(ba[n++]);
+                                               
+                                       console.log("channel id " + ch + " code " + code + " str " + str + " len " + str.length);
+                                               
+                                       for (m = 0; m < this.myparent.active_children.length; m++)
+                                               if (this.myparent.active_children[m].channel_id == ch) {
+                                                       var child = this.myparent.active_children[m];
+                                                       var ms = new CloseEvent("close", { code:code, reason:str } );
+                                                       
+                                                       /* reply with close ack */
+                                                       this.send(msg.data);
+                                                       
+                                                       if (child.onclose)
+                                                               child.onclose(ms);
+                                                       
+                                                       this.myparent.active_children.splice(m, 1);
+                                                       break;
+                                               }
+
+                               }
+                               } // switch
+                       }
+               } else {
+                       if (msg.data.charCodeAt(0) == lws_meta_cmd.WRITE ) {
+                               var ch = msg.data.charCodeAt(1), m, ms;
+                               var ws = this.myparent, ms;
+                                                               
+                               for (m = 0; m < ws.active_children.length; m++) {
+                                       if (ws.active_children[m].channel_id == ch) {
+                                               ms = new MessageEvent("WebSocket", { data: msg.data.substr(2, msg.data.length - 2) } );
+                                               if (ws.active_children[m].onmessage)
+                                                       ws.active_children[m].onmessage(ms);
+                                               break;
+                                       }
+                               }
+                       }
+               }
+       }
+       this.real.onclose = function() {
+               var ws = this.myparent, m;
+               for (m = 0; m < ws.active_children.length; m++) {
+                       var child = ws.active_children[m];
+                       var ms = new CloseEvent("close", { code:1000, reason:"parent closed" } );
+                       
+                       if (child.onclose)
+                               child.onclose(ms);
+               }
+       }
+
+}
+
+
+
+/* make a child connection using existing lws_meta real ws connection */
+lws_meta_ws.prototype.new_ws = function(suburl, protocol)
+{
+       var ch = new lws_meta_ws_child();
+       
+       ch.suburl = suburl;
+       ch.subprotocol = protocol;
+       ch.cookie = "C" + this.ordinal++;
+       
+       this.pending_children.push(ch);
+       
+       if (this.real.readyState == 1)
+               this.real.onopen();
+       
+       return ch;
+}
+
+
+/*
+ * end of lws-meta helpers
+ */
  
 function lws_san(s)
 {
index 9623df2..d26e2b2 100644 (file)
@@ -226,8 +226,15 @@ static const struct lws_protocol_vhost_options pvo_opt4 = {
  * linked-list.  We can also give the plugin per-vhost options here.
  */
 
-static const struct lws_protocol_vhost_options pvo_4 = {
+static const struct lws_protocol_vhost_options pvo_5 = {
+       NULL,
        NULL,
+       "lws-meta",
+       "" /* ignored, just matches the protocol name above */
+};
+
+static const struct lws_protocol_vhost_options pvo_4 = {
+       &pvo_5,
        &pvo_opt4, /* set us as the protocol who gets raw connections */
        "protocol-lws-raw-test",
        "" /* ignored, just matches the protocol name above */
index 0765cfb..dd02fd2 100644 (file)
@@ -68,6 +68,7 @@ char crl_path[1024] = "";
 #define LWS_PLUGIN_STATIC
 #include "../plugins/protocol_lws_mirror.c"
 #include "../plugins/protocol_lws_status.c"
+#include "../plugins/protocol_lws_meta.c"
 
 /* singlethreaded version --> no locks */
 
@@ -104,6 +105,8 @@ enum demo_protocols {
        PROTOCOL_LWS_MIRROR,
        PROTOCOL_LWS_STATUS,
 
+       PROTOCOL_LWS_META,
+
        /* always last */
        DEMO_PROTOCOL_COUNT
 };
@@ -130,6 +133,8 @@ static struct lws_protocols protocols[] = {
        },
        LWS_PLUGIN_PROTOCOL_MIRROR,
        LWS_PLUGIN_PROTOCOL_LWS_STATUS,
+
+       LWS_PLUGIN_PROTOCOL_LWS_META,
        { NULL, NULL, 0, 0 } /* terminator */
 };
 
index e7868ef..4642618 100644 (file)
@@ -533,21 +533,31 @@ if (params.mirror)
        mirror_name = params.mirror;
 
        console.log(mirror_name);
+       
 
-function new_ws(urlpath, protocol)
-{
-       if (typeof MozWebSocket != "undefined")
-               return new MozWebSocket(urlpath, protocol);
+/*
+ * if using lws-meta to carry the other ws connections, declare the
+ * parent connection object and start its connection to the server.
+ *
+ * These helpers are defined in lws-common.js
+ */
 
-       return new WebSocket(get_appropriate_ws_url(urlpath), protocol);
-}
+
+var lws_meta = new lws_meta_ws();
+lws_meta.new_parent(get_appropriate_ws_url("?mirror=" + mirror_name));
 
 
 document.getElementById("number").textContent = get_appropriate_ws_url(mirror_name);
 
 /* dumb increment protocol */
+
+       /*
+        * to connect via an lws-meta connection, start the connection using
+        * lws_meta.new_ws().  To connect by independent connection, start
+        * the connection using just new_ws()
+        */
        
-       var socket_di = new_ws("", "dumb-increment-protocol");
+       var socket_di = lws_meta.new_ws("", "dumb-increment-protocol");
 
        try {
                socket_di.onopen = function() {
@@ -571,8 +581,8 @@ document.getElementById("number").textContent = get_appropriate_ws_url(mirror_na
        
        var socket_status, jso, s;
 
-       socket_status = new_ws(get_appropriate_ws_url(""), "lws-status");
 
+       socket_status = lws_meta.new_ws(get_appropriate_ws_url(""), "lws-status");
 
        try {
                socket_status.onopen = function() {
@@ -585,6 +595,8 @@ document.getElementById("number").textContent = get_appropriate_ws_url(mirror_na
                socket_status.onmessage =function got_packet(msg) {
                        var s;
                        
+                       console.log(msg.data);
+                       
                        jso = JSON.parse(msg.data);
                        
                        document.getElementById("servinfo").innerHTML = 
@@ -637,7 +649,9 @@ var socket_ot;
 
 function ot_open() {
 
-       socket_ot = new_ws(get_appropriate_ws_url(""), "dumb-increment-protocol");
+       socket_ot = lws_meta.new_ws(get_appropriate_ws_url(""), "dumb-increment-protocol");
+
+       console.log("ot_open");
 
        try {
                socket_ot.onopen = function() {
@@ -646,6 +660,7 @@ function ot_open() {
                        document.getElementById("ot_open_btn").disabled = true;
                        document.getElementById("ot_close_btn").disabled = false;
                        document.getElementById("ot_req_close_btn").disabled = false;
+                       console.log("ot_open.onopen");
                } 
 
                socket_ot.onclose = function(e){
@@ -680,7 +695,7 @@ function ot_req_close() {
        var socket_lm;
        var color = "#000000";
 
-       socket_lm = new_ws(get_appropriate_ws_url("?mirror=" + mirror_name),
+       socket_lm = lws_meta.new_ws(get_appropriate_ws_url("?mirror=" + mirror_name),
                        "lws-mirror-protocol");
 
        try {