rxflow remove recursion and simplify
authorAndy Green <andy.green@linaro.org>
Sat, 16 Mar 2013 03:24:23 +0000 (11:24 +0800)
committerAndy Green <andy.green@linaro.org>
Sat, 16 Mar 2013 03:24:23 +0000 (11:24 +0800)
Signed-off-by: Andy Green <andy.green@linaro.org>
lib/client.c
lib/handshake.c
lib/libwebsockets.c
lib/parsers.c
lib/private-libwebsockets.h
test-server/test-client.c
test-server/test-server.c

index cd79119..963c659 100644 (file)
@@ -626,6 +626,8 @@ check_accept:
 
        memset(&wsi->u, 0, sizeof(wsi->u));
 
+       wsi->u.ws.rxflow_change_to = LWS_RXFLOW_ALLOW;
+
        /*
         * create the frame buffer for this connection according to the
         * size mentioned in the protocol definition.  If 0 there, then
index bfcad65..4e9bd70 100644 (file)
@@ -239,6 +239,7 @@ libwebsocket_read(struct libwebsocket_context *context,
 
                /* union transition */
                memset(&wsi->u, 0, sizeof(wsi->u));
+               wsi->u.ws.rxflow_change_to = LWS_RXFLOW_ALLOW;
 
                /*
                 * create the frame buffer for this connection according to the
index bcde2ac..c96db24 100644 (file)
@@ -841,6 +841,7 @@ libwebsocket_service_fd(struct libwebsocket_context *context,
        struct timeval tv;
        int timed_out = 0;
        int our_fd = 0;
+       char draining_flow = 0;
 
 #ifndef LWS_NO_EXTENSIONS
        int more = 1;
@@ -984,14 +985,25 @@ libwebsocket_service_fd(struct libwebsocket_context *context,
                /* the guy requested a callback when it was OK to write */
 
                if ((pollfd->revents & POLLOUT) &&
-                                           wsi->state == WSI_STATE_ESTABLISHED)
-                       if (lws_handle_POLLOUT_event(context, wsi,
-                                                                 pollfd) < 0) {
+                       wsi->state == WSI_STATE_ESTABLISHED &&
+                          lws_handle_POLLOUT_event(context, wsi, pollfd) < 0) {
                                lwsl_info("libwebsocket_service_fd: closing\n");
                                goto close_and_handled;
                        }
 
 
+               if (wsi->u.ws.rxflow_buffer &&
+                             (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)) {
+                       lwsl_info("draining rxflow\n");
+                       /* well, drain it */
+                       eff_buf.token = (char *)wsi->u.ws.rxflow_buffer +
+                                               wsi->u.ws.rxflow_pos;
+                       eff_buf.token_len = wsi->u.ws.rxflow_len -
+                                               wsi->u.ws.rxflow_pos;
+                       draining_flow = 1;
+                       goto drain;
+               }
+
                /* any incoming data ready? */
 
                if (!(pollfd->revents & POLLIN))
@@ -1041,6 +1053,7 @@ read_pending:
                 */
 
                eff_buf.token = (char *)context->service_buffer;
+drain:
 #ifndef LWS_NO_EXTENSIONS
                more = 1;
                while (more) {
@@ -1079,6 +1092,14 @@ read_pending:
                        eff_buf.token_len = 0;
                }
 #endif
+               if (draining_flow && wsi->u.ws.rxflow_buffer &&
+                                wsi->u.ws.rxflow_pos == wsi->u.ws.rxflow_len) {
+                       lwsl_info("flow buffer: drained\n");
+                       free(wsi->u.ws.rxflow_buffer);
+                       wsi->u.ws.rxflow_buffer = NULL;
+                       /* having drained the rxflow buffer, can rearm POLLIN */
+                       _libwebsocket_rx_flow_control(wsi);
+               }
 
 #ifdef LWS_OPENSSL_SUPPORT
                if (wsi->ssl && SSL_pending(wsi->ssl))
@@ -1099,10 +1120,9 @@ read_pending:
        goto handled;
 
 close_and_handled:
-       libwebsocket_close_and_free_session(
-               context, wsi,
-                           LWS_CLOSE_STATUS_NOSTATUS);
-       n = 0;
+       libwebsocket_close_and_free_session(context, wsi,
+                                               LWS_CLOSE_STATUS_NOSTATUS);
+       n = 1;
 
 handled:
        pollfd->revents = 0;
@@ -1249,6 +1269,7 @@ int
 libwebsocket_service(struct libwebsocket_context *context, int timeout_ms)
 {
        int n;
+       int m;
 
        /* stay dead once we are dead */
 
@@ -1266,11 +1287,17 @@ libwebsocket_service(struct libwebsocket_context *context, int timeout_ms)
 
        /* any socket with events to service? */
 
-       for (n = 0; n < context->fds_count; n++)
-               if (context->fds[n].revents)
-                       if (libwebsocket_service_fd(context,
-                                                       &context->fds[n]) < 0)
-                               return -1;
+       for (n = 0; n < context->fds_count; n++) {
+               if (!context->fds[n].revents)
+                       continue;
+               m = libwebsocket_service_fd(context, &context->fds[n]);
+               if (m < 0)
+                       return -1;
+               /* if something closed, retry this slot */
+               if (m)
+                       n--;
+       }
+
        return 0;
 }
 
@@ -1479,7 +1506,7 @@ lws_latency(struct libwebsocket_context *context, struct libwebsocket *wsi,
 
 #ifdef LWS_NO_SERVER
 int
-_libwebsocket_rx_flow_control(struct libwebsocket *wsi)
+_libwebsocket_rx_flow_control(struct libswebsocket *wsi)
 {
        return 0;
 }
@@ -1488,34 +1515,33 @@ int
 _libwebsocket_rx_flow_control(struct libwebsocket *wsi)
 {
        struct libwebsocket_context *context = wsi->protocol->owning_server;
-       int n;
 
-       if (!(wsi->u.ws.rxflow_change_to & 2))
+       /* there is no pending change */
+       if (!(wsi->u.ws.rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE))
                return 0;
 
-       wsi->u.ws.rxflow_change_to &= ~2;
+       /* stuff is still buffered, not ready to really accept new input */
+       if (wsi->u.ws.rxflow_buffer) {
+               /* get ourselves called back to deal with stashed buffer */
+               libwebsocket_callback_on_writable(context, wsi);
+               return 0;
+       }
 
-       lwsl_info("rxflow: wsi %p change_to %d\n",
-                                       wsi, wsi->u.ws.rxflow_change_to);
+       /* pending is cleared, we can change rxflow state */
 
-       /* if we're letting it come again, did we interrupt anything? */
-       if ((wsi->u.ws.rxflow_change_to & 1) && wsi->u.ws.rxflow_buffer) {
-               n = libwebsocket_interpret_incoming_packet(wsi, NULL, 0);
-               if (n < 0) {
-                       lwsl_info("libwebsocket_rx_flow_control: close req\n");
-                       return -1;
-               }
-               if (n)
-                       /* oh he stuck again, do nothing */
-                       return 0;
-       }
+       wsi->u.ws.rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE;
+
+       lwsl_info("rxflow: wsi %p change_to %d\n", wsi,
+                             wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW);
 
-       if (wsi->u.ws.rxflow_change_to & 1)
+       /* adjust the pollfd for this wsi */
+
+       if (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)
                context->fds[wsi->position_in_fds_table].events |= POLLIN;
        else
                context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
 
-       if (wsi->u.ws.rxflow_change_to & 1)
+       if (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)
                /* external POLL support via protocol 0 */
                context->protocols[0].callback(context, wsi,
                        LWS_CALLBACK_SET_MODE_POLL_FD,
@@ -1544,7 +1570,11 @@ _libwebsocket_rx_flow_control(struct libwebsocket *wsi)
 int
 libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
 {
-       wsi->u.ws.rxflow_change_to = 2 | !!enable;
+       if (enable == (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW))
+               return 0;
+
+       lwsl_info("libwebsocket_rx_flow_control(0x%p, %d)\n", wsi, enable);
+       wsi->u.ws.rxflow_change_to = LWS_RXFLOW_PENDING_CHANGE | !!enable;
 
        return 0;
 }
index 7e6fd89..3ff3fbe 100644 (file)
@@ -1047,36 +1047,22 @@ illegal_ctl_length:
 int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi,
                                                 unsigned char *buf, size_t len)
 {
-       size_t n;
+       size_t n = 0;
        int m;
-       int clear_rxflow = !!wsi->u.ws.rxflow_buffer;
-       struct libwebsocket_context *context = wsi->protocol->owning_server;
 
 #if 0
        lwsl_parser("received %d byte packet\n", (int)len);
        lwsl_hexdump(buf, len);
 #endif
 
-       if (buf && wsi->u.ws.rxflow_buffer)
-               lwsl_err("!!!! pending rxflow data loss\n");
-
        /* let the rx protocol state machine have as much as it needs */
 
-       n = 0;
-       if (!buf) {
-               lwsl_info("dumping stored rxflow buffer len %d pos=%d\n",
-                                   wsi->u.ws.rxflow_len, wsi->u.ws.rxflow_pos);
-               buf = wsi->u.ws.rxflow_buffer;
-               n = wsi->u.ws.rxflow_pos;
-               len = wsi->u.ws.rxflow_len;
-               /* let's pretend he's already allowing input */
-               context->fds[wsi->position_in_fds_table].events |= POLLIN;
-       }
-
        while (n < len) {
-               if (!(context->fds[wsi->position_in_fds_table].events &
-                                                                     POLLIN)) {
-                       /* his RX is flowcontrolled */
+               /*
+                * we were accepting input but now we stopped doing so
+                */
+               if (!(wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)) {
+                       /* his RX is flowcontrolled, don't send remaining now */
                        if (!wsi->u.ws.rxflow_buffer) {
                                /* a new rxflow, buffer it and warn caller */
                                lwsl_info("new rxflow input buffer len %d\n",
@@ -1087,24 +1073,21 @@ int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi,
                                wsi->u.ws.rxflow_pos = 0;
                                memcpy(wsi->u.ws.rxflow_buffer,
                                                        buf + n, len - n);
-                       } else {
-                               lwsl_info("re-using rxflow input buffer\n");
+                       } else
                                /* rxflow while we were spilling prev rxflow */
-                               wsi->u.ws.rxflow_pos = n;
-                       }
+                               lwsl_info("stalling in existing rxflow buffer");
+
                        return 1;
                }
-               m = libwebsocket_rx_sm(wsi, buf[n]);
+
+               /* account for what we're using in rxflow buffer */
+               if (wsi->u.ws.rxflow_buffer)
+                       wsi->u.ws.rxflow_pos++;
+
+               /* process the byte */
+               m = libwebsocket_rx_sm(wsi, buf[n++]);
                if (m < 0)
                        return -1;
-               n++;
-       }
-
-       if (clear_rxflow) {
-               lwsl_info("flow: clearing it\n");
-               free(wsi->u.ws.rxflow_buffer);
-               wsi->u.ws.rxflow_buffer = NULL;
-               context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
        }
 
        return 0;
index eecdd3d..251a702 100644 (file)
@@ -227,6 +227,11 @@ enum connection_mode {
        LWS_CONNMODE_SERVER_LISTENER,
 };
 
+enum {
+       LWS_RXFLOW_ALLOW = (1 << 0),
+       LWS_RXFLOW_PENDING_CHANGE = (1 << 1),
+};
+
 struct libwebsocket_protocols;
 struct libwebsocket;
 
index b602442..d25e31f 100644 (file)
@@ -122,7 +122,7 @@ callback_lws_mirror(struct libwebsocket_context *context,
        switch (reason) {
 
        case LWS_CALLBACK_CLOSED:
-               fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
+               fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED mirror_lifetime=%d\n", mirror_lifetime);
                wsi_mirror = NULL;
                break;
 
index ad2e9cd..cfe34ee 100644 (file)
@@ -431,7 +431,7 @@ callback_dumb_increment(struct libwebsocket_context *context,
 
 /* lws-mirror_protocol */
 
-#define MAX_MESSAGE_QUEUE 128
+#define MAX_MESSAGE_QUEUE 32
 
 struct per_session_data__lws_mirror {
        struct libwebsocket *wsi;
@@ -461,8 +461,7 @@ callback_lws_mirror(struct libwebsocket_context *context,
        switch (reason) {
 
        case LWS_CALLBACK_ESTABLISHED:
-               lwsl_info("callback_lws_mirror: "
-                                                "LWS_CALLBACK_ESTABLISHED\n");
+               lwsl_info("callback_lws_mirror: LWS_CALLBACK_ESTABLISHED\n");
                pss->ringbuffer_tail = ringbuffer_head;
                pss->wsi = wsi;
                break;
@@ -488,9 +487,9 @@ callback_lws_mirror(struct libwebsocket_context *context,
                                lwsl_err("ERROR %d writing to mirror socket\n", n);
                                return -1;
                        }
-                       if (n < ringbuffer[pss->ringbuffer_tail].len) {
-                               lwsl_err("mirror partial write %d vs %d\n", n, ringbuffer[pss->ringbuffer_tail].len);
-                       }
+                       if (n < ringbuffer[pss->ringbuffer_tail].len)
+                               lwsl_err("mirror partial write %d vs %d\n",
+                                      n, ringbuffer[pss->ringbuffer_tail].len);
 
                        if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
                                pss->ringbuffer_tail = 0;
@@ -507,8 +506,13 @@ callback_lws_mirror(struct libwebsocket_context *context,
 
                        if (lws_send_pipe_choked(wsi)) {
                                libwebsocket_callback_on_writable(context, wsi);
-                               return 0;
+                               break;
                        }
+                       /*
+                        * for tests with chrome on same machine as client and
+                        * server, this is needed to stop chrome choking
+                        */
+                       usleep(1);
                }
                break;
 
@@ -540,6 +544,7 @@ callback_lws_mirror(struct libwebsocket_context *context,
 
 choke:
                if (num_wsi_choked < sizeof wsi_choked / sizeof wsi_choked[0]) {
+                       lwsl_debug("LWS_CALLBACK_RECEIVE: throttling %p\n", wsi);
                        libwebsocket_rx_flow_control(wsi, 0);
                        wsi_choked[num_wsi_choked++] = wsi;
                }