ext: pmd: improve dealing with partial input usage with drain
authorAndy Green <andy@warmcat.com>
Mon, 20 Mar 2017 11:07:19 +0000 (19:07 +0800)
committerAndy Green <andy@warmcat.com>
Mon, 20 Mar 2017 11:07:19 +0000 (19:07 +0800)
https://github.com/warmcat/libwebsockets/issues/841

lib/extension-permessage-deflate.c
lib/libuv.c
lib/parsers.c
lib/server.c
lib/service.c
plugins/protocol_lws_status.c
test-server/test-server-status.c
test-server/test.html

index b052ca7..fe71666 100644 (file)
@@ -202,7 +202,7 @@ lws_extension_callback_pm_deflate(struct lws_context *context,
                 * rx buffer by the caller, so this assumption is safe while
                 * we block new rx while draining the existing rx
                 */
-               if (eff_buf->token && eff_buf->token_len) {
+               if (!priv->rx.avail_in && eff_buf->token && eff_buf->token_len) {
                        priv->rx.next_in = (unsigned char *)eff_buf->token;
                        priv->rx.avail_in = eff_buf->token_len;
                }
index 24f24ce..8e377fd 100644 (file)
@@ -40,7 +40,7 @@ lws_uv_idle(uv_idle_t *handle
        struct lws_context_per_thread *pt = lws_container_of(handle,
                                        struct lws_context_per_thread, uv_idle);
 
-       lwsl_debug("%s\n", __func__);
+//     lwsl_debug("%s\n", __func__);
 
        /*
         * is there anybody with pending stuff that needs service forcing?
@@ -51,7 +51,7 @@ lws_uv_idle(uv_idle_t *handle
                /* still somebody left who wants forced service? */
                if (!lws_service_adjust_timeout(pt->context, 1, pt->tid))
                        /* yes... come back again later */
-                       lwsl_debug("%s: done again\n", __func__);
+//                     lwsl_debug("%s: done again\n", __func__);
                return;
        }
 
index 4eb0459..a38c3af 100644 (file)
@@ -954,6 +954,24 @@ LWS_VISIBLE int lws_frame_is_binary(struct lws *wsi)
 {
        return wsi->u.ws.frame_is_binary;
 }
+static void
+lws_remove_wsi_from_draining_ext_list(struct lws *wsi)
+{
+       struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
+       struct lws **w = &pt->rx_draining_ext_list;
+
+       wsi->u.ws.rx_draining_ext = 0;
+       /* remove us from context draining ext list */
+       while (*w) {
+               if (*w == wsi) {
+                       *w = wsi->u.ws.rx_draining_ext_list;
+                       break;
+               }
+               w = &((*w)->u.ws.rx_draining_ext_list);
+       }
+       wsi->u.ws.rx_draining_ext_list = NULL;
+}
+
 
 int
 lws_rx_sm(struct lws *wsi, unsigned char c)
@@ -963,26 +981,17 @@ lws_rx_sm(struct lws *wsi, unsigned char c)
        int ret = 0, n, rx_draining_ext = 0;
        struct lws_tokens eff_buf;
 
+       eff_buf.token = NULL;
+       eff_buf.token_len = 0;
        if (wsi->socket_is_permanently_unusable)
                return -1;
 
        switch (wsi->lws_rx_parse_state) {
        case LWS_RXPS_NEW:
                if (wsi->u.ws.rx_draining_ext) {
-                       struct lws **w = &pt->rx_draining_ext_list;
-
                        eff_buf.token = NULL;
                        eff_buf.token_len = 0;
-                       wsi->u.ws.rx_draining_ext = 0;
-                       /* remove us from context draining ext list */
-                       while (*w) {
-                               if (*w == wsi) {
-                                       *w = wsi->u.ws.rx_draining_ext_list;
-                                       break;
-                               }
-                               w = &((*w)->u.ws.rx_draining_ext_list);
-                       }
-                       wsi->u.ws.rx_draining_ext_list = NULL;
+                       lws_remove_wsi_from_draining_ext_list(wsi);
                        rx_draining_ext = 1;
                        lwsl_err("%s: doing draining flow\n", __func__);
 
@@ -1238,6 +1247,9 @@ handle_first:
        case LWS_RXPS_PAYLOAD_UNTIL_LENGTH_EXHAUSTED:
                assert(wsi->u.ws.rx_ubuf);
 
+               if (wsi->u.ws.rx_draining_ext)
+                       goto drain_extension;
+
                if (wsi->u.ws.rx_ubuf_head + LWS_PRE >=
                    wsi->u.ws.rx_ubuf_alloc) {
                        lwsl_err("Attempted overflow \n");
@@ -1413,6 +1425,9 @@ drain_extension:
                        goto already_done;
 
                n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_RX, &eff_buf, 0);
+               /* eff_buf may be pointing somewhere completely different now,
+                * it's the output
+                */
                if (n < 0) {
                        /*
                         * we may rely on this to get RX, just drop connection
@@ -1426,9 +1441,12 @@ drain_extension:
 
                if (n && eff_buf.token_len) {
                        /* extension had more... main loop will come back */
+                       // lwsl_notice("ext has stuff to drain\n");
                        wsi->u.ws.rx_draining_ext = 1;
                        wsi->u.ws.rx_draining_ext_list = pt->rx_draining_ext_list;
                        pt->rx_draining_ext_list = wsi;
+               } else {
+                       lws_remove_wsi_from_draining_ext_list(wsi);
                }
 
                if (eff_buf.token_len > 0 ||
index e5ba891..7c0dd17 100644 (file)
@@ -2242,7 +2242,7 @@ try_pollout:
                        if (accept_fd < 0) {
                                if (LWS_ERRNO == LWS_EAGAIN ||
                                    LWS_ERRNO == LWS_EWOULDBLOCK) {
-                                       lwsl_err("accept asks to try again\n");
+//                                     lwsl_err("accept asks to try again\n");
                                        break;
                                }
                                lwsl_err("ERROR on accept: %s\n", strerror(LWS_ERRNO));
@@ -2521,6 +2521,7 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
                }
 
                if (wsi->u.ws.rx_draining_ext) {
+                       // lwsl_notice("draining with 0\n");
                        m = lws_rx_sm(wsi, 0);
                        if (m < 0)
                                return -1;
@@ -2532,7 +2533,8 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
                        wsi->rxflow_pos++;
 
                /* consume payload bytes efficiently */
-               if (wsi->lws_rx_parse_state ==
+               if (
+                   wsi->lws_rx_parse_state ==
                    LWS_RXPS_PAYLOAD_UNTIL_LENGTH_EXHAUSTED) {
                        m = lws_payload_until_length_exhausted(wsi, buf, &len);
                        if (wsi->rxflow_buffer)
index 050a8ce..82e379a 100644 (file)
@@ -688,6 +688,17 @@ completed:
 }
 #endif
 
+static int
+lws_is_ws_with_ext(struct lws *wsi)
+{
+#if defined(LWS_NO_EXTENSIONS)
+       return 0;
+#else
+       return wsi->state == LWSS_ESTABLISHED &&
+              !!wsi->count_act_ext;
+#endif
+}
+
 LWS_VISIBLE int
 lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int tsi)
 {
@@ -839,7 +850,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
 
 #endif
 
-       lwsl_debug("fd=%d, revents=%d, mode=%d, state=%d\n", pollfd->fd, pollfd->revents, (int)wsi->mode, (int)wsi->state);
+//       lwsl_debug("fd=%d, revents=%d, mode=%d, state=%d\n", pollfd->fd, pollfd->revents, (int)wsi->mode, (int)wsi->state);
        if (pollfd->revents & LWS_POLLHUP)
                goto close_and_handled;
 
@@ -1030,9 +1041,21 @@ read:
                                        wsi->u.hdr.ah->rxpos;
                } else {
                        if (wsi->mode != LWSCM_HTTP_CLIENT_ACCEPTED) {
+                               /*
+                                * extension may not consume everything (eg, pmd may be constrained
+                                * as to what it can output...) has to go in per-wsi rx buf area.
+                                * Otherwise in large temp serv_buf area.
+                                */
+                               eff_buf.token = (char *)pt->serv_buf;
+                               if (lws_is_ws_with_ext(wsi)) {
+                                       eff_buf.token_len = wsi->u.ws.rx_ubuf_alloc;
+                               } else {
+                                       eff_buf.token_len = context->pt_serv_buf_size;
+                               }
+
                                eff_buf.token_len = lws_ssl_capable_read(wsi,
-                                       pt->serv_buf, pending ? pending :
-                                       context->pt_serv_buf_size);
+                                       (unsigned char *)eff_buf.token, pending ? pending :
+                                       eff_buf.token_len);
                                switch (eff_buf.token_len) {
                                case 0:
                                        lwsl_info("%s: zero length read\n", __func__);
@@ -1045,8 +1068,7 @@ read:
                                        lwsl_info("Closing when error\n");
                                        goto close_and_handled;
                                }
-
-                               eff_buf.token = (char *)pt->serv_buf;
+                               // lwsl_notice("Actual RX %d\n", eff_buf.token_len);
                        }
                }
 
@@ -1112,6 +1134,8 @@ drain:
                                 * around again it will pick up from where it
                                 * left off.
                                 */
+                               // lwsl_notice("doing lws_read from pt->serv_buf %p %p for len %d\n", pt->serv_buf, eff_buf.token, (int)eff_buf.token_len);
+
                                n = lws_read(wsi, (unsigned char *)eff_buf.token,
                                             eff_buf.token_len);
                                if (n < 0) {
@@ -1138,7 +1162,11 @@ drain:
 
                pending = lws_ssl_pending(wsi);
                if (pending) {
-                       pending = pending > context->pt_serv_buf_size ?
+                       if (lws_is_ws_with_ext(wsi))
+                               pending = pending > wsi->u.ws.rx_ubuf_alloc ?
+                                       wsi->u.ws.rx_ubuf_alloc : pending;
+                       else
+                               pending = pending > context->pt_serv_buf_size ?
                                        context->pt_serv_buf_size : pending;
                        goto read;
                }
index 49fa6b9..5b32139 100644 (file)
@@ -200,6 +200,11 @@ walk_final:
                        lws_callback_on_writable(wsi);
                break;
 
+       case LWS_CALLBACK_RECEIVE:
+               lwsl_notice("pmd test: RX len %d\n", (int)len);
+               puts(in);
+               break;
+
        case LWS_CALLBACK_CLOSED:
                pss1 = vhd->live_pss_list;
                pss2 = NULL;
index 50fe5bf..29d77b3 100644 (file)
@@ -131,6 +131,11 @@ callback_lws_status(struct lws *wsi, enum lws_callback_reasons reason,
                update_status(wsi, pss);
                break;
 
+       case LWS_CALLBACK_RECEIVE:
+               lwsl_notice("pmd test: RX len %d\n", (int)len);
+               puts(in);
+               break;
+
        case LWS_CALLBACK_SERVER_WRITEABLE:
                m = lws_write(wsi, (unsigned char *)cache + LWS_PRE, cache_len,
                              LWS_WRITE_TEXT);
index 80696ef..fa02e32 100644 (file)
@@ -245,7 +245,8 @@ initiate the close, which it does with code 1001 and reason "Seeya".
         <div id=s_status>Websocket connection not initialized</div>
        </td>
                <td colspan=1>
-<span class="title">Server Info</span>
+<span class="title">Server Info</span>      <input type=button id=pmd value="Test pmd">
+
        </td>
        </tr><tr>
 <td class="explain" colspan=2>
@@ -347,6 +348,7 @@ document.getElementById('color').onclick = update_color;
 document.getElementById('ot_open_btn').onclick = ot_open;
 document.getElementById('ot_close_btn').onclick = ot_close;
 document.getElementById('ot_req_close_btn').onclick = ot_req_close;
+document.getElementById('pmd').onclick = on_pmd;
 
 /*
  * We display untrusted stuff in html context... reject anything
@@ -614,6 +616,10 @@ function junk() {
        socket_di.send(word);
 }
 
+function on_pmd() {
+   socket_status.send("{ \"RequestType\":\"DDoS\", \"blob\":\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAFAAAABQCAYAAACOEfKtAAAJbElEQVR4Xu2af4xUVxXHv+e9O0OhIEibgrRlF8rOG6CWKiumrUlTo6byj6mmNFta2DdQsRJJrP5j+0cnjSVp1BoSU1yzO28LEeL6h7GJNhpJNa2CBQ1gd9k3QPlRBU2LLV0W2p173zFvZvbtzOybmbd7J9DEd/+befece97nfe+95/4gxEWLAGlZx8aIAWqKIAYYA9QkoGkeKzAGqElA0zxWYAxQk4CmeazAGKAmAU3zWIExQE0CmuaxAmOAmgQ0zWMFxgA1CWiaxwqMAWoS0DRvrQJ3ujeb17Hrx6SuzLwV32x7VzO+1pj3HJprJue8BcBQCSONRzr+2RrHaO2OtHDc1wDcUwqOj0s7nWpVoDp+hOMeA5Au+zggbesuHX+Vti1VoOh3j4BxRxEf8xmVSbe3KlAdP2bOPUmEpaXviiGZsVbq+IsBxgCnpp9YgVPjNan2RwagmXMfUpnUAEAc9k6Rx8AHB0zzy3euU5nUXk02RXMzl+9Ssw8PYN06FeYvOkAmM5dfpzLWL6LGFXkSMR13jIAEmKXMpBM6AIXjeih+BVLqTEcSWfJ/T71ks4bZ9vAYgU1/epC2ZegAFLnhAogEM0tV5x1r/UcGKBw3UJ1sTyVwH8laZ5EU2DO4WCTFmXFbOTbnemxZdHnq9AAMvDVTjF4ObKVS7di8IvA97jOSArNsiLZ8oGBpW5HYRKrkB1IFcGwkiS2dhWkB7B1qE6Z5uiUAew7NEsk5o4EveEtgLw98TwngwIApRlcFoogBVnzdSArUAWg6+T4QS3U69Xi98eiqK7A4vnX1+YsldWbPJmSzk8fJa6FAP67FXTv9sVLZqU1k9rvHibGsmKQzBlTGeih0gqgcA69CFxaO+zKA+4uxEF6W3dbaSXFdA4Bmzt1DhK5SLHycTMd9n4A5pT9oSNqp0GXO1VagcFx/cphZhnZF2tasjwJAkRs+CqJPFvEBIzHA8a8ScQyMAdbbTGgJwAY7FS3LA3edXCyUbJoHitzwKIhK3Zb5ssykr5/Uha9BHigc9whQ3nHyu7DIDe8A0TY/OI/xtJexngnP5ocLRCRAkLLbas1KhOGps6lE2MxvOO6TBvBsMS7Pe8rbtHz7pLhavRLpdwtgCDCkzIS/o9Gff8pg/n55EtlRSqT7h+8B0xhs62AYvHJlf524QZ3duzs0pfCT7aj7gQNsmqPHH1V2qr9+ewBePHYXIICNHfsb1TNzw7Y6e343svdNWh35dpHyQL9iKUV5VGVSu+qt94tx7DrZCa8wA93pP0deiTR80fLDyACjOGthncgAp9FmqwG+CsbnynHkpW1Z04ip5Sai3x0CY3kpU+P9sjt9d6saaSlA9JybJWaMXARDyRFjLrZ1fNiqQLX8DAwmxai4CCAhzblzsWFhsH7W8lvM8eOiRSAGqIUvVqAmvhhgDFCbgKaDqY+B2ayBBV2fQJJnwzQlSL6HjcsvhMaRHUwG/2dXjjWMtXdwPjBjPryCARKXIcQ7sJd8EOn9fFuTbkCBCDOMUZw69596SXWVv4HB2bicXAClBARdwqm95+stEurFER1gz6GESM45DGBFmLNJW+C5E7cKUmfH68pkoh3rl046r8CL7s3CQ+hdFQY2KNvaXS94o8/\" }");
+}
+
 var socket_ot;
 
 function ot_open() {