solve flowcontrol problems 29/2929/1
authorAndy Green <andy.green@linaro.org>
Thu, 17 Jan 2013 08:50:35 +0000 (16:50 +0800)
committerKevron Rees <kevron_m_rees@linux.intel.com>
Thu, 7 Mar 2013 21:01:26 +0000 (13:01 -0800)
Problems with rx flow control implementation were the underlying cause
of the connection stalling issue that was covered up with the udelay()
patch that was removed recently.

This get rx flow control working properly and corrects problems with
fifo management in the test server mirror protocol code too.

The rxfow control api has been changed to just set a flag, so it's very cheap
to call from user code.  After the callbacks that might use the rxflow control
api the flag is checked and any pending actions done.

rx flow control now stops any rx packet coming immediately, with compessed
connections "just what was left in the pipe" might be hundreds of KBytes.  To
implement that the current packet being decoded is copied into a malloc'd buffer
by the rx processing code now.

When rxflow is allows to come again, the buffer is drained and freed before any
new packet content is accepted.

Signed-off-by: Andy Green <andy.green@linaro.org>
lib/Makefile.am
lib/extension-deflate-frame.c
lib/libwebsockets.c
lib/parsers.c
lib/private-libwebsockets.h
test-server/test-server.c

index 78ac33f..441f819 100644 (file)
@@ -36,7 +36,7 @@ else
 dist_libwebsockets_la_SOURCES += md5.c sha-1.c
 endif
 
-libwebsockets_la_CFLAGS=-Wall -std=gnu99 -pedantic
+libwebsockets_la_CFLAGS=-Wall -std=gnu99 -pedantic -g
 libwebsockets_la_LDFLAGS=
 
 if MINGW
index 541131b..7a8facc 100644 (file)
@@ -163,7 +163,7 @@ bail:
                                 * screwed.. close the connection... we will get a
                                 * destroy callback to take care of closing nicely
                                 */
-                               fprintf(stderr, "zlib error inflate %d: %s",
+                               lwsl_err("zlib error inflate %d: %s\n",
                                                           n, conn->zs_in.msg);
                                return -1;
                        }
index add3bd7..99ce9e8 100644 (file)
@@ -371,6 +371,9 @@ just_kill_connection:
        if (wsi->c_address)
                free(wsi->c_address);
 
+       if (wsi->rxflow_buffer)
+               free(wsi->rxflow_buffer);
+
 /*     lwsl_info("closing fd=%d\n", wsi->sock); */
 
 #ifdef LWS_OPENSSL_SUPPORT
@@ -660,7 +663,8 @@ notify_action:
        else
                n = LWS_CALLBACK_SERVER_WRITEABLE;
 
-       wsi->protocol->callback(context, wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0);
+       user_callback_handle_rxflow(wsi->protocol->callback, context,
+               wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0);
 
        return 0;
 }
@@ -902,7 +906,7 @@ libwebsocket_service_fd(struct libwebsocket_context *context,
                                               LWS_CLOSE_STATUS_NOSTATUS);
                else
                        if (wsi->state == WSI_STATE_HTTP && wsi->protocol->callback)
-                               if (wsi->protocol->callback(context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space,
+                               if (user_callback_handle_rxflow(wsi->protocol->callback, context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space,
                                                                wsi->filepath, wsi->filepos))
                                        libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS);
                break;
@@ -1117,7 +1121,7 @@ bail_prox_listener:
 
                        /* broadcast it to this connection */
 
-                       new_wsi->protocol->callback(context, new_wsi,
+                       user_callback_handle_rxflow(new_wsi->protocol->callback, context, new_wsi,
                                LWS_CALLBACK_BROADCAST,
                                new_wsi->user_space,
                                buf + LWS_SEND_BUFFER_PRE_PADDING, len);
@@ -1531,34 +1535,38 @@ libwebsocket_get_socket_fd(struct libwebsocket *wsi)
        return wsi->sock;
 }
 
-/**
- * libwebsocket_rx_flow_control() - Enable and disable socket servicing for
- *                             receieved packets.
- *
- * If the output side of a server process becomes choked, this allows flow
- * control for the input side.
- *
- * @wsi:       Websocket connection instance to get callback for
- * @enable:    0 = disable read servicing for this connection, 1 = enable
- */
 
 int
-libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
+_libwebsocket_rx_flow_control(struct libwebsocket *wsi)
 {
        struct libwebsocket_context *context = wsi->protocol->owning_server;
        int n;
 
-       for (n = 0; n < context->fds_count; n++)
-               if (context->fds[n].fd == wsi->sock) {
-                       if (enable)
-                               context->fds[n].events |= POLLIN;
-                       else
-                               context->fds[n].events &= ~POLLIN;
+       if (!(wsi->rxflow_change_to & 2))
+               return 0;
 
-                       return 0;
+       wsi->rxflow_change_to &= ~2;
+
+       lwsl_info("rxflow: wsi %p change_to %d\n", wsi, wsi->rxflow_change_to);
+
+       /* if we're letting it come again, did we interrupt anything? */
+       if ((wsi->rxflow_change_to & 1) && wsi->rxflow_buffer) {
+               n = libwebsocket_interpret_incoming_packet(wsi, NULL, 0);
+               if (n < 0) {
+                       libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS);
+                       return -1;
                }
+               if (n)
+                       /* oh he stuck again, do nothing */
+                       return 0;
+       }
 
-       if (enable)
+       if (wsi->rxflow_change_to & 1)
+               context->fds[wsi->position_in_fds_table].events |= POLLIN;
+       else
+               context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
+
+       if (wsi->rxflow_change_to & 1)
                /* external POLL support via protocol 0 */
                context->protocols[0].callback(context, wsi,
                        LWS_CALLBACK_SET_MODE_POLL_FD,
@@ -1569,13 +1577,30 @@ libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
                        LWS_CALLBACK_CLEAR_MODE_POLL_FD,
                        (void *)(long)wsi->sock, NULL, POLLIN);
 
-#if 0
-       lwsl_err("libwebsocket_rx_flow_control unable to find socket\n");
-#endif
        return 1;
 }
 
 /**
+ * libwebsocket_rx_flow_control() - Enable and disable socket servicing for
+ *                             receieved packets.
+ *
+ * If the output side of a server process becomes choked, this allows flow
+ * control for the input side.
+ *
+ * @wsi:       Websocket connection instance to get callback for
+ * @enable:    0 = disable read servicing for this connection, 1 = enable
+ */
+
+int
+libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
+{
+       wsi->rxflow_change_to = 2 | !!enable;
+
+       return 0;
+}
+
+
+/**
  * libwebsocket_canonical_hostname() - returns this host's hostname
  *
  * This is typically used by client code to fill in the host parameter
@@ -1630,6 +1655,23 @@ OpenSSL_verify_callback(int preverify_ok, X509_STORE_CTX *x509_ctx)
 }
 #endif
 
+int user_callback_handle_rxflow(callback_function callback_function,
+               struct libwebsocket_context * context,
+                       struct libwebsocket *wsi,
+                        enum libwebsocket_callback_reasons reason, void *user,
+                                                         void *in, size_t len)
+{
+       int n;
+
+       n = callback_function(context, wsi, reason, user, in, len);
+       if (n < 0)
+               return n;
+
+       _libwebsocket_rx_flow_control(wsi);
+
+       return 0;
+}
+
 
 /**
  * libwebsocket_create_context() - Create the websocket handler
@@ -2366,7 +2408,8 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol,
                        if (wsi->protocol != protocol)
                                continue;
 
-                       wsi->protocol->callback(context, wsi,
+                       user_callback_handle_rxflow(wsi->protocol->callback,
+                                context, wsi,
                                 LWS_CALLBACK_BROADCAST,
                                 wsi->user_space,
                                 buf, len);
index 6d87907..853b1ec 100644 (file)
@@ -664,6 +664,7 @@ handle_first:
                break;
 
        case LWS_RXPS_EAT_UNTIL_76_FF:
+
                if (c == 0xff) {
                        wsi->lws_rx_parse_state = LWS_RXPS_NEW;
                        goto issue;
@@ -675,7 +676,8 @@ handle_first:
                        break;
 issue:
                if (wsi->protocol->callback)
-                       wsi->protocol->callback(wsi->protocol->owning_server,
+                       user_callback_handle_rxflow(wsi->protocol->callback,
+                         wsi->protocol->owning_server,
                          wsi, LWS_CALLBACK_RECEIVE,
                          wsi->user_space,
                          &wsi->rx_user_buffer[LWS_SEND_BUFFER_PRE_PADDING],
@@ -865,7 +867,8 @@ spill:
                    eff_buf.token[eff_buf.token_len] = '\0';
 
                    if (wsi->protocol->callback)
-                           wsi->protocol->callback(wsi->protocol->owning_server,
+                           user_callback_handle_rxflow(wsi->protocol->callback,
+                                                   wsi->protocol->owning_server,
                                                    wsi, LWS_CALLBACK_RECEIVE,
                                                    wsi->user_space,
                                                    eff_buf.token,
@@ -895,18 +898,58 @@ int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi,
                                                 unsigned char *buf, size_t len)
 {
        size_t n;
+       int m;
+       int clear_rxflow = !!wsi->rxflow_buffer;
+       struct libwebsocket_context *context = wsi->protocol->owning_server;
 
 #ifdef DEBUG
        lwsl_parser("received %d byte packet\n", (int)len);
        lwsl_hexdump(buf, len);
 #endif
 
+       if (buf && wsi->rxflow_buffer)
+               lwsl_err("!!!! libwebsocket_interpret_incoming_packet: was pending rxflow, data loss\n");
+
        /* let the rx protocol state machine have as much as it needs */
 
        n = 0;
-       while (n < len)
-               if (libwebsocket_rx_sm(wsi, buf[n++]) < 0)
+       if (!buf) {
+               lwsl_info("dumping stored rxflow buffer len %d pos=%d\n", wsi->rxflow_len, wsi->rxflow_pos);
+               buf = wsi->rxflow_buffer;
+               n = wsi->rxflow_pos;
+               len = wsi->rxflow_len;
+               /* let's pretend he's already allowing input */
+               context->fds[wsi->position_in_fds_table].events |= POLLIN;
+       }
+
+       while (n < len) {
+               if (!(context->fds[wsi->position_in_fds_table].events & POLLIN)) {
+                       /* his RX is flowcontrolled */
+                       if (!wsi->rxflow_buffer) { /* a new rxflow in effect, buffer it and warn caller */
+                               lwsl_info("new rxflow input buffer len %d\n", len - n);
+                               wsi->rxflow_buffer = (unsigned char *)malloc(len - n);
+                               wsi->rxflow_len = len - n;
+                               wsi->rxflow_pos = 0;
+                               memcpy(wsi->rxflow_buffer, buf + n, len - n);
+                       } else {
+                               lwsl_info("re-using rxflow input buffer\n");
+                               /* rxflow while we were spilling previous rxflow buffer */
+                               wsi->rxflow_pos = n;
+                       }
+                       return 1;
+               }
+               m = libwebsocket_rx_sm(wsi, buf[n]);
+               if (m < 0)
                        return -1;
+               n++;
+       }
+
+       if (clear_rxflow) {
+               lwsl_info("flow: clearing it\n");
+               free(wsi->rxflow_buffer);
+               wsi->rxflow_buffer = NULL;
+               context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
+       }
 
        return 0;
 }
index 0027006..ce5d2f4 100644 (file)
@@ -345,6 +345,10 @@ struct libwebsocket {
 
        int sock;
        int position_in_fds_table;
+       unsigned char *rxflow_buffer;
+       int rxflow_len;
+       int rxflow_pos;
+       int rxflow_change_to;
 
        enum lws_rx_parse_state lws_rx_parse_state;
        char extension_data_pending;
@@ -480,6 +484,15 @@ extern int
 lws_issue_raw_ext_access(struct libwebsocket *wsi,
                                                unsigned char *buf, size_t len);
 
+extern int
+_libwebsocket_rx_flow_control(struct libwebsocket *wsi);
+
+extern int
+user_callback_handle_rxflow(callback_function, struct libwebsocket_context * context,
+                       struct libwebsocket *wsi,
+                        enum libwebsocket_callback_reasons reason, void *user,
+                                                         void *in, size_t len);
+
 #ifndef LWS_OPENSSL_SUPPORT
 
 unsigned char *
index 857cae2..9487d80 100644 (file)
@@ -334,6 +334,8 @@ struct a_message {
 static struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
 static int ringbuffer_head;
 
+static struct libwebsocket *wsi_choked[20];
+static int num_wsi_choked;
 
 static int
 callback_lws_mirror(struct libwebsocket_context *context,
@@ -365,7 +367,7 @@ callback_lws_mirror(struct libwebsocket_context *context,
                                                                LWS_WRITE_TEXT);
                        if (n < 0) {
                                fprintf(stderr, "ERROR %d writing to socket\n", n);
-                               exit(1);
+                               return 1;
                        }
 
                        if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
@@ -373,9 +375,14 @@ callback_lws_mirror(struct libwebsocket_context *context,
                        else
                                pss->ringbuffer_tail++;
 
-                       if (((ringbuffer_head - pss->ringbuffer_tail) %
-                                 MAX_MESSAGE_QUEUE) < (MAX_MESSAGE_QUEUE - 15))
-                               libwebsocket_rx_flow_control(wsi, 1);
+                       if (((ringbuffer_head - pss->ringbuffer_tail) &
+                                 (MAX_MESSAGE_QUEUE - 1)) < (MAX_MESSAGE_QUEUE - 15)) {
+                               for (n = 0; n < num_wsi_choked; n++)
+                                       libwebsocket_rx_flow_control(wsi_choked[n], 1);
+                               num_wsi_choked = 0;
+                       }
+
+//                     fprintf(stderr, "tx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1));
 
                        libwebsocket_callback_on_writable(context, wsi);
 
@@ -390,6 +397,12 @@ callback_lws_mirror(struct libwebsocket_context *context,
 
        case LWS_CALLBACK_RECEIVE:
 
+               if (((ringbuffer_head - pss->ringbuffer_tail) &
+                                 (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) {
+                       fprintf(stderr, "dropping!\n");
+                       goto choke;
+               }
+
                if (ringbuffer[ringbuffer_head].payload)
                        free(ringbuffer[ringbuffer_head].payload);
 
@@ -404,13 +417,22 @@ callback_lws_mirror(struct libwebsocket_context *context,
                else
                        ringbuffer_head++;
 
-               if (((ringbuffer_head - pss->ringbuffer_tail) %
-                                 MAX_MESSAGE_QUEUE) > (MAX_MESSAGE_QUEUE - 10))
+               if (((ringbuffer_head - pss->ringbuffer_tail) &
+                                 (MAX_MESSAGE_QUEUE - 1)) < (MAX_MESSAGE_QUEUE - 10))
+                       goto done;
+
+choke:
+               if (num_wsi_choked < sizeof wsi_choked / sizeof wsi_choked[0]) {
                        libwebsocket_rx_flow_control(wsi, 0);
+                       wsi_choked[num_wsi_choked++] = wsi;
+               }
 
+//             fprintf(stderr, "rx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1));
+done:
                libwebsocket_callback_on_writable_all_protocol(
                                               libwebsockets_get_protocol(wsi));
                break;
+
        /*
         * this just demonstrates how to use the protocol filter. If you won't
         * study and reject connections based on header content, you don't need