multithread stability
authorAndy Green <andy.green@linaro.org>
Tue, 26 Jan 2016 12:56:56 +0000 (20:56 +0800)
committerAndy Green <andy.green@linaro.org>
Tue, 26 Jan 2016 12:56:56 +0000 (20:56 +0800)
Signed-off-by: Andy Green <andy.green@linaro.org>
21 files changed:
CMakeLists.txt
lib/context.c
lib/handshake.c
lib/libev.c
lib/libwebsockets.c
lib/libwebsockets.h
lib/lws-plat-unix.c
lib/lws-plat-win.c
lib/output.c
lib/parsers.c
lib/pollfd.c
lib/private-libwebsockets.h
lib/server-handshake.c
lib/server.c
lib/service.c
lib/ssl.c
test-server/test-server-http.c
test-server/test-server-libev.c
test-server/test-server-mirror.c
test-server/test-server-pthreads.c
test-server/test-server.c

index 4460941..f191117 100644 (file)
@@ -85,10 +85,16 @@ set(LWS_WITHOUT_CLIENT ON)
 set(LWS_WITHOUT_TESTAPPS ON)
 set(LWS_WITHOUT_EXTENSIONS ON)
 set(LWS_MBED3 ON)
+# this implies no pthreads in the lib
 set(LWS_MAX_SMP 1)
 
 endif()
 
+if (WIN32)
+# this implies no pthreads in the lib
+set(LWS_MAX_SMP 1)
+endif()
+
 
 # Allow the user to override installation directories.
 set(LWS_INSTALL_LIB_DIR       lib CACHE PATH "Installation directory for libraries")
index 26aac35..3e5fc02 100644 (file)
@@ -80,7 +80,7 @@ lws_create_context(struct lws_context_creation_info *info)
        int pid_daemon = get_daemonize_pid();
 #endif
        char *p;
-       int n;
+       int n, m;
 
        lwsl_notice("Initial logging level %d\n", log_level);
 
@@ -96,7 +96,7 @@ lws_create_context(struct lws_context_creation_info *info)
 #endif
        lws_feature_status_libev(info);
 #endif
-       lwsl_info(" LWS_MAX_HEADER_LEN    : %u\n", LWS_MAX_HEADER_LEN);
+       lwsl_info(" LWS_DEF_HEADER_LEN    : %u\n", LWS_DEF_HEADER_LEN);
        lwsl_info(" LWS_MAX_PROTOCOLS     : %u\n", LWS_MAX_PROTOCOLS);
        lwsl_info(" LWS_MAX_SMP           : %u\n", LWS_MAX_SMP);
        lwsl_info(" SPEC_LATEST_SUPPORTED : %u\n", SPEC_LATEST_SUPPORTED);
@@ -129,7 +129,6 @@ lws_create_context(struct lws_context_creation_info *info)
        if (context->count_threads > LWS_MAX_SMP)
                context->count_threads = LWS_MAX_SMP;
 
-       context->lserv_seen = 0;
        context->protocols = info->protocols;
        context->token_limits = info->token_limits;
        context->listen_port = info->port;
@@ -141,8 +140,18 @@ lws_create_context(struct lws_context_creation_info *info)
        context->ka_interval = info->ka_interval;
        context->ka_probes = info->ka_probes;
 
-       /* we zalloc only the used ones, so the memory is not wasted
-        * allocating for unused threads
+       if (info->max_http_header_data)
+               context->max_http_header_data = info->max_http_header_data;
+       else
+               context->max_http_header_data = LWS_DEF_HEADER_LEN;
+       if (info->max_http_header_pool)
+               context->max_http_header_pool = info->max_http_header_pool;
+       else
+               context->max_http_header_pool = LWS_DEF_HEADER_POOL;
+
+       /*
+        * Allocate the per-thread storage for scratchpad buffers,
+        * and header data pool
         */
        for (n = 0; n < context->count_threads; n++) {
                context->pt[n].serv_buf = lws_zalloc(LWS_MAX_SOCKET_IO_BUF);
@@ -150,6 +159,22 @@ lws_create_context(struct lws_context_creation_info *info)
                        lwsl_err("OOM\n");
                        return NULL;
                }
+
+               context->pt[n].http_header_data = lws_malloc(context->max_http_header_data *
+                                                      context->max_http_header_pool);
+               if (!context->pt[n].http_header_data)
+                       goto bail;
+
+               context->pt[n].ah_pool = lws_zalloc(sizeof(struct allocated_headers) *
+                                             context->max_http_header_pool);
+               for (m = 0; m < context->max_http_header_pool; m++)
+                       context->pt[n].ah_pool[m].data =
+                               (char *)context->pt[n].http_header_data +
+                               (m * context->max_http_header_data);
+               if (!context->pt[n].ah_pool)
+                       goto bail;
+
+               lws_pt_mutex_init(&context->pt[n]);
        }
 
        if (info->fd_limit_per_thread)
@@ -180,44 +205,18 @@ lws_create_context(struct lws_context_creation_info *info)
        context->lws_ev_sigint_cb = &lws_sigint_cb;
 #endif /* LWS_USE_LIBEV */
 
-       lwsl_info(" mem: context:         %5u bytes (%d + (%d x %d))\n",
+       lwsl_info(" mem: context:         %5u bytes (%d ctx + (%d thr x %d))\n",
                  sizeof(struct lws_context) +
                  (context->count_threads * LWS_MAX_SOCKET_IO_BUF),
                  sizeof(struct lws_context),
                  context->count_threads,
                  LWS_MAX_SOCKET_IO_BUF);
 
-       /*
-        * allocate and initialize the pool of
-        * allocated_header structs + data
-        */
-       if (info->max_http_header_data)
-               context->max_http_header_data = info->max_http_header_data;
-       else
-               context->max_http_header_data = LWS_MAX_HEADER_LEN;
-       if (info->max_http_header_pool)
-               context->max_http_header_pool = info->max_http_header_pool;
-       else
-               context->max_http_header_pool = LWS_MAX_HEADER_POOL;
-
-       context->http_header_data = lws_malloc(context->max_http_header_data *
-                                              context->max_http_header_pool);
-       if (!context->http_header_data)
-               goto bail;
-       context->ah_pool = lws_zalloc(sizeof(struct allocated_headers) *
-                                     context->max_http_header_pool);
-       if (!context->ah_pool)
-               goto bail;
-
-       for (n = 0; n < context->max_http_header_pool; n++)
-               context->ah_pool[n].data = (char *)context->http_header_data +
-                       (n * context->max_http_header_data);
-
-       /* this is per context */
-       lwsl_info(" mem: http hdr rsvd:   %5u bytes ((%u + %u) x %u)\n",
+       lwsl_info(" mem: http hdr rsvd:   %5u bytes (%u thr x (%u + %u) x %u))\n",
                    (context->max_http_header_data +
                     sizeof(struct allocated_headers)) *
-                   context->max_http_header_pool,
+                   context->max_http_header_pool * context->count_threads,
+                   context->count_threads,
                    context->max_http_header_data,
                    sizeof(struct allocated_headers),
                    context->max_http_header_pool);
@@ -229,10 +228,13 @@ lws_create_context(struct lws_context_creation_info *info)
                goto bail;
        }
        lwsl_info(" mem: pollfd map:      %5u\n", n);
+
+#if LWS_MAX_SMP > 1
        /* each thread serves his own chunk of fds */
        for (n = 1; n < (int)info->count_threads; n++)
-               context->pt[n].fds = context->pt[0].fds +
-                                 (n * context->fd_limit_per_thread);
+               context->pt[n].fds = context->pt[n - 1].fds +
+                                    context->fd_limit_per_thread;
+#endif
 
        if (lws_plat_init(context, info))
                goto bail;
@@ -345,7 +347,7 @@ lws_context_destroy(struct lws_context *context)
 #endif
 
        while (m--)
-               for (n = 0; n < context->pt[m].fds_count; n++) {
+               for (n = 0; (unsigned int)n < context->pt[m].fds_count; n++) {
                        struct lws *wsi = wsi_from_fd(context, context->pt[m].fds[n].fd);
                        if (!wsi)
                                continue;
@@ -384,17 +386,18 @@ lws_context_destroy(struct lws_context *context)
                ev_signal_stop(context->io_loop, &context->w_sigint.watcher);
 #endif /* LWS_USE_LIBEV */
 
-       for (n = 0; n < context->count_threads; n++)
+       for (n = 0; n < context->count_threads; n++) {
                lws_free_set_NULL(context->pt[n].serv_buf);
+               if (context->pt[n].ah_pool)
+                       lws_free(context->pt[n].ah_pool);
+               if (context->pt[n].http_header_data)
+                       lws_free(context->pt[n].http_header_data);
+       }
 
        lws_plat_context_early_destroy(context);
        lws_ssl_context_destroy(context);
        if (context->pt[0].fds)
                lws_free(context->pt[0].fds);
-       if (context->ah_pool)
-               lws_free(context->ah_pool);
-       if (context->http_header_data)
-               lws_free(context->http_header_data);
 
        lws_plat_context_late_destroy(context);
 
index e513056..310fd3c 100644 (file)
@@ -100,6 +100,7 @@ http_new:
                wsi->u.hdr.lextable_pos = 0;
                /* fallthru */
        case LWSS_HTTP_HEADERS:
+               assert(wsi->u.hdr.ah);
                lwsl_parser("issuing %d bytes to parser\n", (int)len);
 
                if (lws_handshake_client(wsi, &buf, len))
@@ -182,6 +183,7 @@ postbody_completion:
 
        case LWSS_ESTABLISHED:
        case LWSS_AWAITING_CLOSE_ACK:
+       case LWSS_SHUTDOWN:
                if (lws_handshake_client(wsi, &buf, len))
                        goto bail;
                switch (wsi->mode) {
index e52dedb..167c045 100644 (file)
@@ -81,6 +81,7 @@ lws_initloop(struct lws_context *context, struct ev_loop *loop)
        const char * backend_name;
        int status = 0;
        int backend;
+       int m = 0; /* !!! TODO add pt support */
 
        if (!loop)
                loop = ev_default_loop(0);
@@ -91,7 +92,7 @@ lws_initloop(struct lws_context *context, struct ev_loop *loop)
         * Initialize the accept w_accept with the listening socket
         * and register a callback for read operations
         */
-       ev_io_init(w_accept, lws_accept_cb, context->lserv_fd, EV_READ);
+       ev_io_init(w_accept, lws_accept_cb, context->pt[m].lserv_fd, EV_READ);
        ev_io_start(context->io_loop,w_accept);
 
        /* Register the signal watcher unless the user says not to */
index fad9ab3..26979e9 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * libwebsockets - small server side websockets and web server implementation
  *
- * Copyright (C) 2010-2014 Andy Green <andy@warmcat.com>
+ * Copyright (C) 2010-2016 Andy Green <andy@warmcat.com>
  *
  *  This library is free software; you can redistribute it and/or
  *  modify it under the terms of the GNU Lesser General Public
@@ -63,33 +63,72 @@ lws_free_wsi(struct lws *wsi)
         */
        if (wsi->mode != LWSCM_WS_CLIENT &&
            wsi->mode != LWSCM_WS_SERVING)
-               if (wsi->u.hdr.ah)
-                       lws_free_header_table(wsi);
+               lws_free_header_table(wsi);
+
        lws_free(wsi);
 }
 
-
 static void
 lws_remove_from_timeout_list(struct lws *wsi)
 {
+       struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
+
        if (!wsi->timeout_list_prev)
                return;
 
+       lws_pt_lock(pt);
        if (wsi->timeout_list)
                wsi->timeout_list->timeout_list_prev = wsi->timeout_list_prev;
        *wsi->timeout_list_prev = wsi->timeout_list;
 
        wsi->timeout_list_prev = NULL;
        wsi->timeout_list = NULL;
+       lws_pt_unlock(pt);
 }
 
+/**
+ * lws_set_timeout() - marks the wsi as subject to a timeout
+ *
+ * You will not need this unless you are doing something special
+ *
+ * @wsi:       Websocket connection instance
+ * @reason:    timeout reason
+ * @secs:      how many seconds
+ */
+
+LWS_VISIBLE void
+lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs)
+{
+       struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
+       time_t now;
+
+       lws_pt_lock(pt);
+
+       time(&now);
+
+       if (!wsi->pending_timeout && reason) {
+               wsi->timeout_list = pt->timeout_list;
+               if (wsi->timeout_list)
+                       wsi->timeout_list->timeout_list_prev = &wsi->timeout_list;
+               wsi->timeout_list_prev = &pt->timeout_list;
+               *wsi->timeout_list_prev = wsi;
+       }
+
+       wsi->pending_timeout_limit = now + secs;
+       wsi->pending_timeout = reason;
+
+       lws_pt_unlock(pt);
+
+       if (!reason)
+               lws_remove_from_timeout_list(wsi);
+}
 
 void
 lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
 {
        struct lws_context *context;
        struct lws_context_per_thread *pt;
-       int n, m, ret, old_state;
+       int n, m, ret;
        struct lws_tokens eff_buf;
 
        if (!wsi)
@@ -97,7 +136,6 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
 
        context = wsi->context;
        pt = &context->pt[(int)wsi->tsi];
-       old_state = wsi->state;
 
        if (wsi->mode == LWSCM_HTTP_SERVING_ACCEPTED &&
            wsi->u.http.fd != LWS_INVALID_FILE) {
@@ -108,10 +146,13 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
                                               wsi->user_space, NULL, 0);
        }
        if (wsi->socket_is_permanently_unusable ||
-           reason == LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY)
+           reason == LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY ||
+           wsi->state == LWSS_SHUTDOWN)
                goto just_kill_connection;
 
-       switch (old_state) {
+       wsi->state_pre_close = wsi->state;
+
+       switch (wsi->state_pre_close) {
        case LWSS_DEAD_SOCKET:
                return;
 
@@ -203,7 +244,7 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
         * LWSS_AWAITING_CLOSE_ACK and will skip doing this a second time.
         */
 
-       if (old_state == LWSS_ESTABLISHED &&
+       if (wsi->state_pre_close == LWSS_ESTABLISHED &&
            (wsi->u.ws.close_in_ping_buffer_len || /* already a reason */
             (reason != LWS_CLOSE_STATUS_NOSTATUS &&
             (reason != LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY)))) {
@@ -218,8 +259,7 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
                                reason & 0xff;
                }
 
-               n = lws_write(wsi, &wsi->u.ws.ping_payload_buf[
-                                               LWS_PRE],
+               n = lws_write(wsi, &wsi->u.ws.ping_payload_buf[LWS_PRE],
                              wsi->u.ws.close_in_ping_buffer_len,
                              LWS_WRITE_CLOSE);
                if (n >= 0) {
@@ -246,7 +286,28 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
 
 just_kill_connection:
 
-       lwsl_debug("close: just_kill_connection: %p\n", wsi);
+#if LWS_POSIX
+       /*
+        * Testing with ab shows that we have to stage the socket close when
+        * the system is under stress... shutdown any further TX, change the
+        * state to one that won't emit anything more, and wait with a timeout
+        * for the POLLIN to show a zero-size rx before coming back and doing
+        * the actual close.
+        */
+       if (wsi->state != LWSS_SHUTDOWN) {
+               lwsl_info("%s: shutting down connection: %p\n", __func__, wsi);
+               n = shutdown(wsi->sock, SHUT_WR);
+               if (n)
+                       lwsl_debug("closing: shutdown ret %d\n", LWS_ERRNO);
+               wsi->state = LWSS_SHUTDOWN;
+               lws_change_pollfd(wsi, LWS_POLLOUT, LWS_POLLIN);
+               lws_set_timeout(wsi, PENDING_TIMEOUT_SHUTDOWN_FLUSH,
+                               AWAITING_TIMEOUT);
+               return;
+       }
+#endif
+
+       lwsl_info("%s: real just_kill_connection: %p\n", __func__, wsi);
 
        /*
         * we won't be servicing or receiving anything further from this guy
@@ -262,7 +323,7 @@ just_kill_connection:
 
        lws_free_set_NULL(wsi->rxflow_buffer);
 
-       if (old_state == LWSS_ESTABLISHED ||
+       if (wsi->state_pre_close == LWSS_ESTABLISHED ||
            wsi->mode == LWSCM_WS_SERVING ||
            wsi->mode == LWSCM_WS_CLIENT) {
 
@@ -308,10 +369,10 @@ just_kill_connection:
        /* tell the user it's all over for this guy */
 
        if (wsi->protocol && wsi->protocol->callback &&
-           ((old_state == LWSS_ESTABLISHED) ||
-           (old_state == LWSS_RETURNED_CLOSE_ALREADY) ||
-           (old_state == LWSS_AWAITING_CLOSE_ACK) ||
-           (old_state == LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE))) {
+           ((wsi->state_pre_close == LWSS_ESTABLISHED) ||
+           (wsi->state_pre_close == LWSS_RETURNED_CLOSE_ALREADY) ||
+           (wsi->state_pre_close == LWSS_AWAITING_CLOSE_ACK) ||
+           (wsi->state_pre_close == LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE))) {
                lwsl_debug("calling back CLOSED\n");
                wsi->protocol->callback(wsi, LWS_CALLBACK_CLOSED,
                                        wsi->user_space, NULL, 0);
@@ -327,7 +388,7 @@ just_kill_connection:
                                        wsi->user_space, NULL, 0);
        } else
                lwsl_debug("not calling back closed mode=%d state=%d\n",
-                          wsi->mode, old_state);
+                          wsi->mode, wsi->state_pre_close);
 
        /* deallocate any active extension contexts */
 
@@ -341,12 +402,10 @@ just_kill_connection:
                       LWS_EXT_CB_DESTROY_ANY_WSI_CLOSING, NULL, 0) < 0)
                lwsl_warn("ext destroy wsi failed\n");
 
+       wsi->socket_is_permanently_unusable = 1;
+
        if (!lws_ssl_close(wsi) && lws_socket_is_valid(wsi->sock)) {
 #if LWS_POSIX
-               n = shutdown(wsi->sock, SHUT_RDWR);
-               if (n)
-                       lwsl_debug("closing: shutdown ret %d\n", LWS_ERRNO);
-
                n = compatible_close(wsi->sock);
                if (n)
                        lwsl_debug("closing: close ret %d\n", LWS_ERRNO);
@@ -555,7 +614,7 @@ lws_callback_all_protocol(struct lws_context *context,
 {
        struct lws_context_per_thread *pt = &context->pt[0];
        struct lws *wsi;
-       int n, m = context->count_threads;
+       unsigned int n, m = context->count_threads;
 
        while (m--) {
                for (n = 0; n < pt->fds_count; n++) {
@@ -571,39 +630,6 @@ lws_callback_all_protocol(struct lws_context *context,
        return 0;
 }
 
-/**
- * lws_set_timeout() - marks the wsi as subject to a timeout
- *
- * You will not need this unless you are doing something special
- *
- * @wsi:       Websocket connection instance
- * @reason:    timeout reason
- * @secs:      how many seconds
- */
-
-LWS_VISIBLE void
-lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs)
-{
-       time_t now;
-
-       time(&now);
-
-       if (!wsi->pending_timeout) {
-               wsi->timeout_list = wsi->context->timeout_list;
-               if (wsi->timeout_list)
-                       wsi->timeout_list->timeout_list_prev = &wsi->timeout_list;
-               wsi->timeout_list_prev = &wsi->context->timeout_list;
-               *wsi->timeout_list_prev = wsi;
-       }
-
-       wsi->pending_timeout_limit = now + secs;
-       wsi->pending_timeout = reason;
-
-       if (!reason)
-               lws_remove_from_timeout_list(wsi);
-}
-
-
 #if LWS_POSIX
 
 /**
@@ -704,7 +730,7 @@ lws_rx_flow_allow_all_protocol(const struct lws_context *context,
 {
        const struct lws_context_per_thread *pt = &context->pt[0];
        struct lws *wsi;
-       int n, m = context->count_threads;
+       unsigned int n, m = context->count_threads;
 
        while (m--) {
                for (n = 0; n < pt->fds_count; n++) {
index 405d897..982516f 100644 (file)
@@ -1497,6 +1497,7 @@ enum pending_timeout {
        PENDING_TIMEOUT_HTTP_CONTENT                            = 10,
        PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND                 = 11,
        PENDING_FLUSH_STORED_SEND_BEFORE_CLOSE                  = 12,
+       PENDING_TIMEOUT_SHUTDOWN_FLUSH                          = 13,
 
        /****** add new things just above ---^ ******/
 };
index 98276db..9953ac7 100644 (file)
@@ -122,7 +122,7 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
 {
        struct lws_context_per_thread *pt = &context->pt[tsi];
        struct lws *wsi;
-       int n, m;
+       int n, m, c;
        char buf;
 #ifdef LWS_OPENSSL_SUPPORT
        struct lws *wsi_next;
@@ -215,10 +215,13 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
 
        /* any socket with events to service? */
 
-       for (n = 0; n < pt->fds_count; n++) {
+       c = n;
+       for (n = 0; n < pt->fds_count && c; n++) {
                if (!pt->fds[n].revents)
                        continue;
 
+               c--;
+
                if (pt->fds[n].fd == pt->dummy_pipe_fds[0]) {
                        if (read(pt->fds[n].fd, &buf, 1) != 1)
                                lwsl_err("Cannot read from dummy pipe.");
@@ -457,6 +460,8 @@ LWS_VISIBLE void
 lws_plat_delete_socket_from_fds(struct lws_context *context,
                                                struct lws *wsi, int m)
 {
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+       pt->fds_count--;
 }
 
 LWS_VISIBLE void
index 36df16d..a79bcff 100644 (file)
@@ -152,7 +152,7 @@ LWS_VISIBLE int
 lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
 {
        int n;
-       int i;
+       unsigned int i;
        DWORD ev;
        WSANETWORKEVENTS networkevents;
        struct lws_pollfd *pfd;
@@ -177,7 +177,7 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
 
        for (i = 0; i < pt->fds_count; ++i) {
                pfd = &pt->fds[i];
-               if (pfd->fd == context->lserv_fd)
+               if (pfd->fd == pt->lserv_fd)
                        continue;
 
                if (pfd->events & LWS_POLLOUT) {
@@ -372,7 +372,7 @@ lws_plat_delete_socket_from_fds(struct lws_context *context,
        struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
 
        WSACloseEvent(pt->events[m + 1]);
-       pt->events[m + 1] = pt->events[pt->fds_count + 1];
+       pt->events[m + 1] = pt->events[pt->fds_count--];
 }
 
 LWS_VISIBLE void
index 86d63d4..5f54a4f 100644 (file)
@@ -30,7 +30,7 @@ lws_0405_frame_mask_generate(struct lws *wsi)
        wsi->u.ws.mask[2] = 0;
        wsi->u.ws.mask[3] = 0;
 #else
-               int n;
+       int n;
        /* fetch the per-frame nonce */
 
        n = lws_get_random(lws_get_context(wsi), wsi->u.ws.mask, 4);
@@ -643,6 +643,7 @@ lws_ssl_capable_write_no_ssl(struct lws *wsi, unsigned char *buf, int len)
 
 #if LWS_POSIX
        n = send(wsi->sock, (char *)buf, len, MSG_NOSIGNAL);
+       lwsl_info("%s: sent len %d result %d", __func__, len, n);
        if (n >= 0)
                return n;
 
@@ -662,7 +663,7 @@ lws_ssl_capable_write_no_ssl(struct lws *wsi, unsigned char *buf, int len)
        // !!!
 #endif
 
-       lwsl_debug("ERROR writing len %d to skt %d\n", len, n);
+       lwsl_debug("ERROR writing len %d to skt fd %d err %d / errno %d\n", len, wsi->sock, n, LWS_ERRNO);
        return LWS_SSL_CAPABLE_ERROR;
 }
 #endif
index 5b7aecd..bc9bf4c 100644 (file)
@@ -27,8 +27,8 @@ unsigned char lextable[] = {
 
 #define FAIL_CHAR 0x08
 
-int
-LWS_WARN_UNUSED_RESULT lextable_decode(int pos, char c)
+int LWS_WARN_UNUSED_RESULT
+lextable_decode(int pos, char c)
 {
        if (c >= 'A' && c <= 'Z')
                c += 'a' - 'A';
@@ -60,81 +60,172 @@ LWS_WARN_UNUSED_RESULT lextable_decode(int pos, char c)
        }
 }
 
+static void
+lws_reset_header_table(struct lws *wsi)
+{
+       /* init the ah to reflect no headers or data have appeared yet */
+       memset(wsi->u.hdr.ah->frag_index, 0, sizeof(wsi->u.hdr.ah->frag_index));
+       wsi->u.hdr.ah->nfrag = 0;
+       wsi->u.hdr.ah->pos = 0;
+}
+
 int LWS_WARN_UNUSED_RESULT
 lws_allocate_header_table(struct lws *wsi)
 {
        struct lws_context *context = wsi->context;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+       struct lws_pollargs pa;
+       struct lws **pwsi;
        int n;
 
-       lwsl_debug("%s: wsi %p: ah %p\n", __func__, (void *)wsi,
-                (void *)wsi->u.hdr.ah);
+       lwsl_info("%s: wsi %p: ah %p (tsi %d)\n", __func__, (void *)wsi,
+                (void *)wsi->u.hdr.ah, wsi->tsi);
 
        /* if we are already bound to one, just clear it down */
-       if (wsi->u.hdr.ah)
+       if (wsi->u.hdr.ah) {
+               lwsl_err("cleardown\n");
                goto reset;
+       }
+
+       lws_pt_lock(pt);
+       pwsi = &pt->ah_wait_list;
+       while (*pwsi) {
+               if (*pwsi == wsi) {
+                       /* if already waiting on list, if no new ah just ret */
+                       if (pt->ah_count_in_use ==
+                           context->max_http_header_pool) {
+                               lwsl_err("ah wl denied\n");
+                               goto bail;
+                       }
+                       /* new ah.... remove ourselves from waiting list */
+                       *pwsi = wsi->u.hdr.ah_wait_list;
+                       wsi->u.hdr.ah_wait_list = NULL;
+                       pt->ah_wait_list_length--;
+                       break;
+               }
+               pwsi = &(*pwsi)->u.hdr.ah_wait_list;
+       }
        /*
-        * server should have suppressed the accept of a new wsi before this
-        * became the case.  If initiating multiple client connects, make sure
-        * the ah pool is big enough to cope, or be prepared to retry
+        * pool is all busy... add us to waiting list and return that we
+        * weren't able to deliver it right now
         */
-       if (context->ah_count_in_use == context->max_http_header_pool) {
-               lwsl_err("No free ah\n");
-               return -1;
+       if (pt->ah_count_in_use == context->max_http_header_pool) {
+               lwsl_err("%s: adding %p to ah waiting list\n", __func__, wsi);
+               wsi->u.hdr.ah_wait_list = pt->ah_wait_list;
+               pt->ah_wait_list = wsi;
+               pt->ah_wait_list_length++;
+
+               /* we cannot accept input then */
+
+               _lws_change_pollfd(wsi, LWS_POLLIN, 0, &pa);
+               goto bail;
        }
 
        for (n = 0; n < context->max_http_header_pool; n++)
-               if (!context->ah_pool[n].in_use)
+               if (!pt->ah_pool[n].in_use)
                        break;
 
        /* if the count of in use said something free... */
        assert(n != context->max_http_header_pool);
 
-       wsi->u.hdr.ah = &context->ah_pool[n];
+       wsi->u.hdr.ah = &pt->ah_pool[n];
        wsi->u.hdr.ah->in_use = 1;
+       pt->ah_count_in_use++;
 
-       context->ah_count_in_use++;
-       /* if we used up all the ah, defeat accepting new server connections */
-       if (context->ah_count_in_use == context->max_http_header_pool)
-               if (_lws_server_listen_accept_flow_control(context, 0))
-                       return 1;
+       _lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa);
 
-       lwsl_debug("%s: wsi %p: ah %p: count %d (on exit)\n",
-                __func__, (void *)wsi, (void *)wsi->u.hdr.ah,
-                context->ah_count_in_use);
+       lwsl_info("%s: wsi %p: ah %p: count %d (on exit)\n", __func__,
+                 (void *)wsi, (void *)wsi->u.hdr.ah, pt->ah_count_in_use);
+
+       lws_pt_unlock(pt);
 
 reset:
-       /* init the ah to reflect no headers or data have appeared yet */
-       memset(wsi->u.hdr.ah->frag_index, 0, sizeof(wsi->u.hdr.ah->frag_index));
-       wsi->u.hdr.ah->nfrag = 0;
-       wsi->u.hdr.ah->pos = 0;
+       lws_reset_header_table(wsi);
+       time(&wsi->u.hdr.ah->assigned);
 
        return 0;
+
+bail:
+       lws_pt_unlock(pt);
+
+       return 1;
 }
 
 int lws_free_header_table(struct lws *wsi)
 {
        struct lws_context *context = wsi->context;
+       struct allocated_headers *ah = wsi->u.hdr.ah;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+       struct lws_pollargs pa;
+       struct lws **pwsi;
+       time_t now;
+
+       lwsl_info("%s: wsi %p: ah %p (tsi=%d, count = %d)\n", __func__, (void *)wsi,
+                (void *)wsi->u.hdr.ah, wsi->tsi, pt->ah_count_in_use);
+
+       lws_pt_lock(pt);
+
+       pwsi = &pt->ah_wait_list;
+       if (!wsi->u.hdr.ah) { /* remove from wait list if that's all */
+               if (wsi->socket_is_permanently_unusable)
+                       while (*pwsi) {
+                               if (*pwsi == wsi) {
+                                       lwsl_info("%s: wsi %p, removing from wait list\n",
+                                                       __func__, wsi);
+                                       *pwsi = wsi->u.hdr.ah_wait_list;
+                                       wsi->u.hdr.ah_wait_list = NULL;
+                                       pt->ah_wait_list_length--;
+                                       goto bail;
+                               }
+                               pwsi = &(*pwsi)->u.hdr.ah_wait_list;
+                       }
 
-       lwsl_debug("%s: wsi %p: ah %p (count = %d)\n", __func__, (void *)wsi,
-                (void *)wsi->u.hdr.ah, context->ah_count_in_use);
+               goto bail;
+       }
+       time(&now);
+       if (now - wsi->u.hdr.ah->assigned > 3)
+               lwsl_err("header assign - free time %d\n",
+                        (int)(now - wsi->u.hdr.ah->assigned));
+       /* if we think we're freeing one, there should be one to free */
+       assert(pt->ah_count_in_use > 0);
+       /* and he should have been in use */
+       assert(wsi->u.hdr.ah->in_use);
+       wsi->u.hdr.ah = NULL;
 
-       assert(wsi->u.hdr.ah);
-       if (!wsi->u.hdr.ah)
-               return 0;
+       if (!*pwsi) {
+               ah->in_use = 0;
+               pt->ah_count_in_use--;
 
-       /* if we think we're freeing one, there should be one to free */
-       assert(context->ah_count_in_use > 0);
+               goto bail;
+       }
 
-       assert(wsi->u.hdr.ah->in_use);
-       wsi->u.hdr.ah->in_use = 0;
+       /* somebody else on same tsi is waiting, give it to him */
 
-       /* if we just freed up one ah, allow new server connection */
-       if (context->ah_count_in_use == context->max_http_header_pool)
-               if (_lws_server_listen_accept_flow_control(context, 1))
-                       return 1;
+       lwsl_info("pt wait list %p\n", *pwsi);
+       while ((*pwsi)->u.hdr.ah_wait_list)
+               pwsi = &(*pwsi)->u.hdr.ah_wait_list;
 
-       context->ah_count_in_use--;
-       wsi->u.hdr.ah = NULL;
+       wsi = *pwsi;
+       lwsl_info("last wsi in wait list %p\n", wsi);
+
+       wsi->u.hdr.ah = ah;
+       lws_reset_header_table(wsi);
+       time(&wsi->u.hdr.ah->assigned);
+
+       assert(wsi->position_in_fds_table != -1);
+
+       lwsl_info("%s: Enabling %p POLLIN\n", __func__, wsi);
+       /* his wait is over, let him progress */
+       _lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa);
+
+       /* point prev guy to next guy in list instead */
+       *pwsi = wsi->u.hdr.ah_wait_list;
+       wsi->u.hdr.ah_wait_list = NULL;
+       pt->ah_wait_list_length--;
+
+       assert(!!pt->ah_wait_list_length == !!(int)(long)pt->ah_wait_list);
+bail:
+       lws_pt_unlock(pt);
 
        return 0;
 }
@@ -386,6 +477,8 @@ lws_parse(struct lws *wsi, unsigned char c)
        struct lws_context *context = wsi->context;
        unsigned int n, m, enc = 0;
 
+       assert(wsi->u.hdr.ah);
+
        switch (wsi->u.hdr.parser_state) {
        default:
 
@@ -651,8 +744,7 @@ swallow:
                        lwsl_parser("known hdr %d\n", n);
                        for (m = 0; m < ARRAY_SIZE(methods); m++)
                                if (n == methods[m] &&
-                                               ah->frag_index[
-                                                       methods[m]]) {
+                                   ah->frag_index[methods[m]]) {
                                        lwsl_warn("Duplicated method\n");
                                        return -1;
                                }
index 8ab3720..c460932 100644 (file)
 #include "private-libwebsockets.h"
 
 int
+_lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
+{
+       struct lws_context *context;
+       struct lws_context_per_thread *pt;
+       int ret = 0, pa_events = 1;
+       struct lws_pollfd *pfd;
+       int sampled_tid, tid;
+
+       if (!wsi || wsi->position_in_fds_table < 0)
+               return 0;
+
+       context = wsi->context;
+       pt = &context->pt[(int)wsi->tsi];
+       assert(wsi->position_in_fds_table >= 0 &&
+              wsi->position_in_fds_table < pt->fds_count);
+
+       pfd = &pt->fds[wsi->position_in_fds_table];
+       pa->fd = wsi->sock;
+       pa->prev_events = pfd->events;
+       pa->events = pfd->events = (pfd->events & ~_and) | _or;
+
+       if (context->protocols[0].callback(wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD,
+                                          wsi->user_space, (void *)pa, 0)) {
+               ret = -1;
+               goto bail;
+       }
+
+       /*
+        * if we changed something in this pollfd...
+        *   ... and we're running in a different thread context
+        *     than the service thread...
+        *       ... and the service thread is waiting ...
+        *         then cancel it to force a restart with our changed events
+        */
+#if LWS_POSIX
+       pa_events = pa->prev_events != pa->events;
+#endif
+       if (pa_events) {
+
+               if (lws_plat_change_pollfd(context, wsi, pfd)) {
+                       lwsl_info("%s failed\n", __func__);
+                       ret = -1;
+                       goto bail;
+               }
+
+               sampled_tid = context->service_tid;
+               if (sampled_tid) {
+                       tid = context->protocols[0].callback(wsi,
+                                    LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0);
+                       if (tid == -1) {
+                               ret = -1;
+                               goto bail;
+                       }
+                       if (tid != sampled_tid)
+                               lws_cancel_service_pt(wsi);
+               }
+       }
+bail:
+       return ret;
+}
+
+int
 insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
 {
        struct lws_pollargs pa = { wsi->sock, LWS_POLLIN, 0 };
        struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+       int ret = 0;
+#ifndef LWS_NO_SERVER
+       struct lws_pollargs pa1;
+#endif
+
+       lwsl_info("%s: %p: tsi=%d, sock=%d, pos-in-fds=%d\n",
+                 __func__, wsi, wsi->tsi, wsi->sock, pt->fds_count);
 
        if ((unsigned int)pt->fds_count >= context->fd_limit_per_thread) {
                lwsl_err("Too many fds (%d)\n", context->max_fds);
@@ -47,83 +116,105 @@ insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
                                           wsi->user_space, (void *) &pa, 1))
                return -1;
 
+       lws_pt_lock(pt);
+       pt->count_conns++;
        insert_wsi(context, wsi);
        wsi->position_in_fds_table = pt->fds_count;
        pt->fds[pt->fds_count].fd = wsi->sock;
        pt->fds[pt->fds_count].events = LWS_POLLIN;
 
+       /* don't apply this logic to the listening socket... */
+//     if (wsi->mode != LWSCM_SERVER_LISTENER && !wsi->u.hdr.ah)
+//             pt->fds[pt->fds_count].events = 0;
+
+       pa.events = pt->fds[pt->fds_count].events;
+
        lws_plat_insert_socket_into_fds(context, wsi);
 
        /* external POLL support via protocol 0 */
        if (context->protocols[0].callback(wsi, LWS_CALLBACK_ADD_POLL_FD,
                                           wsi->user_space, (void *) &pa, 0))
-               return -1;
+               ret =  -1;
+#ifndef LWS_NO_SERVER
+       /* if no more room, defeat accepts on this thread */
+       if ((unsigned int)pt->fds_count == context->fd_limit_per_thread - 1)
+               _lws_change_pollfd(pt->wsi_listening, LWS_POLLIN, 0, &pa1);
+#endif
+       lws_pt_unlock(pt);
 
        if (context->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
                                           wsi->user_space, (void *)&pa, 1))
-               return -1;
+               ret = -1;
 
-       return 0;
+       return ret;
 }
 
 int
 remove_wsi_socket_from_fds(struct lws *wsi)
 {
-       int m;
+       int m, ret = 0;
        struct lws *end_wsi;
        struct lws_pollargs pa = { wsi->sock, 0, 0 };
+#ifndef LWS_NO_SERVER
+       struct lws_pollargs pa1;
+#endif
        struct lws_context *context = wsi->context;
        struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
 
-       lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE);
-
-       --pt->fds_count;
-
 #if !defined(_WIN32) && !defined(MBED_OPERATORS)
        if (wsi->sock > context->max_fds) {
-               lwsl_err("Socket fd %d too high (%d)\n",
-                        wsi->sock, context->max_fds);
+               lwsl_err("fd %d too high (%d)\n", wsi->sock, context->max_fds);
                return 1;
        }
 #endif
 
-       lwsl_info("%s: wsi=%p, sock=%d, fds pos=%d\n", __func__,
-                 wsi, wsi->sock, wsi->position_in_fds_table);
-
        if (context->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
                                           wsi->user_space, (void *)&pa, 1))
                return -1;
 
-       m = wsi->position_in_fds_table; /* replace the contents for this */
+       lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE);
+
+       lws_pt_lock(pt);
+
+       lwsl_info("%s: wsi=%p, sock=%d, fds pos=%d, end guy pos=%d, endfd=%d\n",
+                 __func__, wsi, wsi->sock, wsi->position_in_fds_table,
+                 pt->fds_count, pt->fds[pt->fds_count].fd);
+
+       /* the guy who is to be deleted's slot index in pt->fds */
+       m = wsi->position_in_fds_table;
 
-       /* have the last guy take up the vacant slot */
-       pt->fds[m] = pt->fds[pt->fds_count];
+       /* have the last guy take up the now vacant slot */
+       pt->fds[m] = pt->fds[pt->fds_count - 1];
 
        lws_plat_delete_socket_from_fds(context, wsi, m);
 
-       /*
-        * end guy's fds_lookup entry remains unchanged
-        * (still same fd pointing to same wsi)
-        */
-       /* end guy's "position in fds table" changed */
+       /* end guy's "position in fds table" is now the deletion guy's old one */
        end_wsi = wsi_from_fd(context, pt->fds[pt->fds_count].fd);
+       assert(end_wsi);
        end_wsi->position_in_fds_table = m;
+
        /* deletion guy's lws_lookup entry needs nuking */
        delete_from_fd(context, wsi->sock);
        /* removed wsi has no position any more */
        wsi->position_in_fds_table = -1;
 
        /* remove also from external POLL support via protocol 0 */
-       if (lws_socket_is_valid(wsi->sock)) {
+       if (lws_socket_is_valid(wsi->sock))
                if (context->protocols[0].callback(wsi, LWS_CALLBACK_DEL_POLL_FD,
-                   wsi->user_space, (void *) &pa, 0))
-                       return -1;
-       }
+                                                  wsi->user_space, (void *) &pa, 0))
+                       ret = -1;
+#ifndef LWS_NO_SERVER
+       /* if this made some room, accept connects on this thread */
+       if ((unsigned int)pt->fds_count < context->fd_limit_per_thread - 1)
+               _lws_change_pollfd(pt->wsi_listening, 0, LWS_POLLIN, &pa1);
+#endif
+       lws_pt_unlock(pt);
+
        if (context->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
                                           wsi->user_space, (void *) &pa, 1))
-               return -1;
+               ret = -1;
 
-       return 0;
+       return ret;
 }
 
 int
@@ -131,11 +222,8 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or)
 {
        struct lws_context *context;
        struct lws_context_per_thread *pt;
-       int tid;
-       int sampled_tid;
-       struct lws_pollfd *pfd;
+       int ret = 0;
        struct lws_pollargs pa;
-       int pa_events = 1;
 
        if (!wsi || !wsi->protocol || wsi->position_in_fds_table < 0)
                return 1;
@@ -144,55 +232,20 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or)
        if (!context)
                return 1;
 
-       pt = &context->pt[(int)wsi->tsi];
-
-       pfd = &pt->fds[wsi->position_in_fds_table];
-       pa.fd = wsi->sock;
-
        if (context->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
                                           wsi->user_space,  (void *) &pa, 0))
                return -1;
 
-       pa.prev_events = pfd->events;
-       pa.events = pfd->events = (pfd->events & ~_and) | _or;
-
-       if (context->protocols[0].callback(wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD,
-                                          wsi->user_space, (void *) &pa, 0))
-               return -1;
-
-       /*
-        * if we changed something in this pollfd...
-        *   ... and we're running in a different thread context
-        *     than the service thread...
-        *       ... and the service thread is waiting ...
-        *         then cancel it to force a restart with our changed events
-        */
-#if LWS_POSIX
-       pa_events = pa.prev_events != pa.events;
-#endif
-       if (pa_events) {
-
-               if (lws_plat_change_pollfd(context, wsi, pfd)) {
-                       lwsl_info("%s failed\n", __func__);
-                       return 1;
-               }
-
-               sampled_tid = context->service_tid;
-               if (sampled_tid) {
-                       tid = context->protocols[0].callback(wsi,
-                                    LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0);
-                       if (tid == -1)
-                               return -1;
-                       if (tid != sampled_tid)
-                               lws_cancel_service_pt(wsi);
-               }
-       }
+       pt = &context->pt[(int)wsi->tsi];
 
+       lws_pt_lock(pt);
+       ret = _lws_change_pollfd(wsi, _and, _or, &pa);
+       lws_pt_unlock(pt);
        if (context->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
                                           wsi->user_space, (void *) &pa, 0))
-               return -1;
+               ret = -1;
 
-       return 0;
+       return ret;
 }
 
 
@@ -287,7 +340,7 @@ lws_callback_on_writable_all_protocol(const struct lws_context *context,
 {
        const struct lws_context_per_thread *pt = &context->pt[0];
        struct lws *wsi;
-       int n, m = context->count_threads;
+       unsigned int n, m = context->count_threads;
 
        while (m--) {
                for (n = 0; n < pt->fds_count; n++) {
index b1eb8bc..cfc0a71 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * libwebsockets - small server side websockets and web server implementation
  *
- * Copyright (C) 2010 - 2015 Andy Green <andy@warmcat.com>
+ * Copyright (C) 2010 - 2016 Andy Green <andy@warmcat.com>
  *
  *  This library is free software; you can redistribute it and/or
  *  modify it under the terms of the GNU Lesser General Public
@@ -34,6 +34,9 @@
 #include <limits.h>
 #include <stdarg.h>
 #include <assert.h>
+#if LWS_MAX_SMP > 1
+#include <pthread.h>
+#endif
 
 #ifdef LWS_HAVE_SYS_STAT_H
 #include <sys/stat.h>
@@ -60,6 +63,7 @@
 #define MSG_NOSIGNAL 0
 #define SHUT_RDWR SD_BOTH
 #define SOL_TCP IPPROTO_TCP
+#define SHUT_WR SD_SEND
 
 #define compatible_close(fd) closesocket(fd)
 #define lws_set_blocking_send(wsi) wsi->sock_send_blocking = 1
@@ -280,11 +284,11 @@ extern "C" {
 #endif
 #endif
 
-#ifndef LWS_MAX_HEADER_LEN
-#define LWS_MAX_HEADER_LEN 1024
+#ifndef LWS_DEF_HEADER_LEN
+#define LWS_DEF_HEADER_LEN 1024
 #endif
-#ifndef LWS_MAX_HEADER_POOL
-#define LWS_MAX_HEADER_POOL 16
+#ifndef LWS_DEF_HEADER_POOL
+#define LWS_DEF_HEADER_POOL 16
 #endif
 #ifndef LWS_MAX_PROTOCOLS
 #define LWS_MAX_PROTOCOLS 5
@@ -299,7 +303,7 @@ extern "C" {
 #define SPEC_LATEST_SUPPORTED 13
 #endif
 #ifndef AWAITING_TIMEOUT
-#define AWAITING_TIMEOUT 5
+#define AWAITING_TIMEOUT 20
 #endif
 #ifndef CIPHERS_LIST_STRING
 #define CIPHERS_LIST_STRING "DEFAULT"
@@ -315,12 +319,6 @@ extern "C" {
 #define SYSTEM_RANDOM_FILEPATH "/dev/urandom"
 #endif
 
-/*
- * if not in a connection storm, check for incoming
- * connections this many normal connection services
- */
-#define LWS_lserv_mod 10
-
 enum lws_websocket_opcodes_07 {
        LWSWSOPC_CONTINUATION = 0,
        LWSWSOPC_TEXT_FRAME = 1,
@@ -347,6 +345,7 @@ enum lws_connection_states {
        LWSS_RETURNED_CLOSE_ALREADY,
        LWSS_AWAITING_CLOSE_ACK,
        LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE,
+       LWSS_SHUTDOWN,
 
        LWSS_HTTP2_AWAIT_CLIENT_PREFACE,
        LWSS_HTTP2_ESTABLISHED_PRE_SETTINGS,
@@ -411,6 +410,7 @@ enum connection_mode {
 
        /* transient, ssl delay hiding */
        LWSCM_SSL_ACK_PENDING,
+       LWSCM_SSL_INIT,
 
        /* transient modes */
        LWSCM_WSCL_WAITING_CONNECT,
@@ -481,6 +481,7 @@ struct allocated_headers {
         * lws_fragments->nfrag for continuation.
         */
        struct lws_fragments frags[WSI_TOKEN_COUNT * 2];
+       time_t assigned;
        /*
         * for each recognized token, frag_index says which frag[] his data
         * starts in (0 means the token did not appear)
@@ -503,12 +504,26 @@ struct allocated_headers {
  */
 
 struct lws_context_per_thread {
+#if LWS_MAX_SMP > 1
+       pthread_mutex_t lock;
+#endif
        struct lws_pollfd *fds;
        struct lws *rx_draining_ext_list;
        struct lws *tx_draining_ext_list;
+       struct lws *timeout_list;
+       void *http_header_data;
+       struct allocated_headers *ah_pool;
+       struct lws *ah_wait_list;
+       int ah_wait_list_length;
 #ifdef LWS_OPENSSL_SUPPORT
        struct lws *pending_read_list; /* linked list */
 #endif
+#ifndef LWS_NO_SERVER
+       struct lws *wsi_listening;
+#endif
+       lws_sockfd_type lserv_fd;
+
+       unsigned long count_conns;
        /*
         * usable by anything in the service code, but only if the scope
         * does not last longer than the service action (since next service
@@ -520,7 +535,9 @@ struct lws_context_per_thread {
 #else
        int dummy_pipe_fds[2];
 #endif
-       int fds_count;
+       unsigned int fds_count;
+
+       short ah_count_in_use;
 };
 
 /*
@@ -551,14 +568,8 @@ struct lws_context {
        const char *iface;
        const struct lws_token_limits *token_limits;
        void *user_space;
-       struct lws *timeout_list;
 
-#ifndef LWS_NO_SERVER
-       struct lws *wsi_listening;
-#endif
        const struct lws_protocols *protocols;
-       void *http_header_data;
-       struct allocated_headers *ah_pool;
 
 #ifdef LWS_OPENSSL_SUPPORT
        SSL_CTX *ssl_ctx;
@@ -576,8 +587,6 @@ struct lws_context {
        char worst_latency_info[256];
 #endif
 
-       lws_sockfd_type lserv_fd;
-
        int max_fds;
        int listen_port;
 #ifdef LWS_USE_LIBEV
@@ -587,8 +596,6 @@ struct lws_context {
 
        int fd_random;
        int lserv_mod;
-       int lserv_count;
-       int lserv_seen;
        unsigned int http_proxy_port;
        unsigned int options;
        unsigned int fd_limit_per_thread;
@@ -624,7 +631,6 @@ struct lws_context {
 
        short max_http_header_data;
        short max_http_header_pool;
-       short ah_count_in_use;
        short count_threads;
 
        unsigned int being_destroyed:1;
@@ -697,6 +703,8 @@ enum uri_esc_states {
 struct _lws_http_mode_related {
        /* MUST be first in struct */
        struct allocated_headers *ah; /* mirroring  _lws_header_related */
+       struct lws *ah_wait_list;
+       struct lws *new_wsi_list;
        unsigned long filepos;
        unsigned long filelen;
        lws_filefd_type fd;
@@ -857,6 +865,7 @@ struct _lws_http2_related {
 struct _lws_header_related {
        /* MUST be first in struct */
        struct allocated_headers *ah;
+       struct lws *ah_wait_list;
        enum uri_path_states ups;
        enum uri_esc_states ues;
        short lextable_pos;
@@ -986,6 +995,7 @@ struct lws {
        unsigned char ietf_spec_revision;
        char mode; /* enum connection_mode */
        char state; /* enum lws_connection_states */
+       char state_pre_close;
        char lws_rx_parse_state; /* enum lws_rx_parse_state */
        char rx_frame_type; /* enum lws_write_protocol */
        char pending_timeout; /* enum pending_timeout */
@@ -1049,7 +1059,7 @@ LWS_EXTERN int
 delete_from_fd(struct lws_context *context, lws_sockfd_type fd);
 #else
 #define wsi_from_fd(A,B)  A->lws_lookup[B]
-#define insert_wsi(A,B)   A->lws_lookup[B->sock]=B
+#define insert_wsi(A,B)   assert(A->lws_lookup[B->sock] == 0); A->lws_lookup[B->sock]=B
 #define delete_from_fd(A,B) A->lws_lookup[B]=0
 #endif
 
@@ -1222,7 +1232,7 @@ enum lws_ssl_capable_status {
 #define lws_ssl_capable_read lws_ssl_capable_read_no_ssl
 #define lws_ssl_capable_write lws_ssl_capable_write_no_ssl
 #define lws_ssl_pending lws_ssl_pending_no_ssl
-#define lws_server_socket_service_ssl(_a, _b, _c, _d) (0)
+#define lws_server_socket_service_ssl(_b, _c) (0)
 #define lws_ssl_close(_a) (0)
 #define lws_ssl_context_destroy(_a)
 #define lws_ssl_remove_wsi_from_buffered_list(_a)
@@ -1236,9 +1246,7 @@ lws_ssl_capable_write(struct lws *wsi, unsigned char *buf, int len);
 LWS_EXTERN int LWS_WARN_UNUSED_RESULT
 lws_ssl_pending(struct lws *wsi);
 LWS_EXTERN int LWS_WARN_UNUSED_RESULT
-lws_server_socket_service_ssl(struct lws **wsi, struct lws *new_wsi,
-                             lws_sockfd_type accept_fd,
-                             struct lws_pollfd *pollfd);
+lws_server_socket_service_ssl(struct lws *new_wsi, lws_sockfd_type accept_fd);
 LWS_EXTERN int
 lws_ssl_close(struct lws *wsi);
 LWS_EXTERN void
@@ -1265,6 +1273,29 @@ lws_context_init_http2_ssl(struct lws_context *context);
 #endif
 #endif
 
+#if LWS_MAX_SMP > 1
+static LWS_INLINE void
+lws_pt_mutex_init(struct lws_context_per_thread *pt)
+{
+       pthread_mutex_init(&pt->lock, NULL);
+}
+static LWS_INLINE void
+lws_pt_lock(struct lws_context_per_thread *pt)
+{
+       pthread_mutex_lock(&pt->lock);
+}
+
+static LWS_INLINE void
+lws_pt_unlock(struct lws_context_per_thread *pt)
+{
+       pthread_mutex_unlock(&pt->lock);
+}
+#else
+#define lws_pt_mutex_init(_a) (void)(_a)
+#define lws_pt_lock(_a) (void)(_a)
+#define lws_pt_unlock(_a) (void)(_a)
+#endif
+
 LWS_EXTERN int LWS_WARN_UNUSED_RESULT
 lws_ssl_capable_read_no_ssl(struct lws *wsi, unsigned char *buf, int len);
 
@@ -1297,6 +1328,9 @@ lws_decode_ssl_error(void);
 LWS_EXTERN int
 _lws_rx_flow_control(struct lws *wsi);
 
+LWS_EXTERN int
+_lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa);
+
 #ifndef LWS_NO_SERVER
 LWS_EXTERN int
 lws_server_socket_service(struct lws_context *context, struct lws *wsi,
@@ -1304,7 +1338,7 @@ lws_server_socket_service(struct lws_context *context, struct lws *wsi,
 LWS_EXTERN int
 lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len);
 LWS_EXTERN int
-_lws_server_listen_accept_flow_control(struct lws_context *context, int on);
+_lws_server_listen_accept_flow_control(struct lws *twsi, int on);
 #else
 #define lws_server_socket_service(_a, _b, _c) (0)
 #define lws_handshake_server(_a, _b, _c) (0)
index 93cba5a..bbbcfcc 100644 (file)
@@ -166,14 +166,13 @@ handshake_0405(struct lws_context *context, struct lws *wsi)
        int accept_len;
 
        if (!lws_hdr_total_length(wsi, WSI_TOKEN_HOST) ||
-                               !lws_hdr_total_length(wsi, WSI_TOKEN_KEY)) {
+           !lws_hdr_total_length(wsi, WSI_TOKEN_KEY)) {
                lwsl_parser("handshake_04 missing pieces\n");
                /* completed header processing, but missing some bits */
                goto bail;
        }
 
-       if (lws_hdr_total_length(wsi, WSI_TOKEN_KEY) >=
-                                                    MAX_WEBSOCKET_04_KEY_LEN) {
+       if (lws_hdr_total_length(wsi, WSI_TOKEN_KEY) >= MAX_WEBSOCKET_04_KEY_LEN) {
                lwsl_warn("Client key too long %d\n", MAX_WEBSOCKET_04_KEY_LEN);
                goto bail;
        }
@@ -183,8 +182,8 @@ handshake_0405(struct lws_context *context, struct lws *wsi)
         * overflow
         */
        n = sprintf((char *)pt->serv_buf,
-                               "%s258EAFA5-E914-47DA-95CA-C5AB0DC85B11",
-                               lws_hdr_simple_ptr(wsi, WSI_TOKEN_KEY));
+                   "%s258EAFA5-E914-47DA-95CA-C5AB0DC85B11",
+                   lws_hdr_simple_ptr(wsi, WSI_TOKEN_KEY));
 
        lws_SHA1(pt->serv_buf, n, hash);
 
index aeb5f8e..319e637 100644 (file)
@@ -33,8 +33,9 @@ int lws_context_init_server(struct lws_context_creation_info *info,
        socklen_t len = sizeof(struct sockaddr);
        struct sockaddr_in sin;
        struct sockaddr *v;
-       int n, opt = 1;
+       int n, opt = 1, limit = 1;
 #endif
+       int m = 0;
        lws_sockfd_type sockfd;
        struct lws *wsi;
 
@@ -44,6 +45,11 @@ int lws_context_init_server(struct lws_context_creation_info *info,
                return 0;
 
 #if LWS_POSIX
+#if defined(__linux__)
+       limit = context->count_threads;
+#endif
+
+       for (m = 0; m < limit; m++) {
 #ifdef LWS_USE_IPV6
        if (LWS_IPV6_ENABLED(context))
                sockfd = socket(AF_INET6, SOCK_STREAM, 0);
@@ -69,6 +75,13 @@ int lws_context_init_server(struct lws_context_creation_info *info,
                compatible_close(sockfd);
                return 1;
        }
+#if defined(__linux__) && defined(SO_REUSEPORT)
+       if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT,
+                      (const void *)&opt, sizeof(opt)) < 0) {
+               compatible_close(sockfd);
+               return 1;
+       }
+#endif
 #endif
        lws_plat_set_socket_options(context, sockfd);
 
@@ -122,19 +135,19 @@ int lws_context_init_server(struct lws_context_creation_info *info,
        wsi->sock = sockfd;
        wsi->mode = LWSCM_SERVER_LISTENER;
        wsi->protocol = context->protocols;
+       wsi->tsi = m;
 
-       context->wsi_listening = wsi;
+       context->pt[m].wsi_listening = wsi;
        if (insert_wsi_socket_into_fds(context, wsi))
                goto bail;
 
-       context->lserv_mod = LWS_lserv_mod;
-       context->lserv_count = 0;
-       context->lserv_fd = sockfd;
+       context->pt[m].lserv_fd = sockfd;
 
 #if LWS_POSIX
-       listen(sockfd, LWS_SOMAXCONN);
+       listen(wsi->sock, LWS_SOMAXCONN);
+       } /* for each thread able to independently lister */
 #else
-       mbed3_tcp_stream_bind(sockfd, info->port, wsi);
+       mbed3_tcp_stream_bind(wsi->sock, info->port, wsi);
 #endif
        lwsl_notice(" Listening on port %d\n", info->port);
 
@@ -147,15 +160,17 @@ bail:
 }
 
 int
-_lws_server_listen_accept_flow_control(struct lws_context *context, int on)
+_lws_server_listen_accept_flow_control(struct lws *twsi, int on)
 {
-       struct lws *wsi = context->wsi_listening;
+       struct lws_context_per_thread *pt = &twsi->context->pt[(int)twsi->tsi];
+       struct lws *wsi = pt->wsi_listening;
        int n;
 
-       if (!wsi || context->being_destroyed)
+       if (!wsi || twsi->context->being_destroyed)
                return 0;
 
-       lwsl_debug("%s: wsi %p: state %d\n", __func__, (void *)wsi, on);
+       lwsl_debug("%s: Thr %d: LISTEN wsi %p: state %d\n",
+                  __func__, twsi->tsi, (void *)wsi, on);
 
        if (on)
                n = lws_change_pollfd(wsi, 0, LWS_POLLIN);
@@ -324,7 +339,7 @@ int lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len)
        char protocol_name[32];
        char *p;
 
-       /* LWSCM_WS_SERVING */
+       assert(wsi->u.hdr.ah);
 
        while (len--) {
 
@@ -576,10 +591,33 @@ bail_nuke_ah:
        return 1;
 }
 
+static int
+lws_get_idlest_tsi(struct lws_context *context)
+{
+       unsigned int lowest = ~0;
+       int n = 0, hit = -1;
+
+       for (; n < context->count_threads; n++) {
+               if ((unsigned int)context->pt[n].fds_count != context->fd_limit_per_thread - 1 &&
+                   (unsigned int)context->pt[n].fds_count < lowest) {
+                       lowest = context->pt[n].fds_count;
+                       hit = n;
+               }
+       }
+
+       return hit;
+}
+
 struct lws *
 lws_create_new_server_wsi(struct lws_context *context)
 {
        struct lws *new_wsi;
+       int n = lws_get_idlest_tsi(context);
+
+       if (n < 0) {
+               lwsl_err("no space for new conn\n");
+               return NULL;
+       }
 
        new_wsi = lws_zalloc(sizeof(struct lws));
        if (new_wsi == NULL) {
@@ -587,6 +625,9 @@ lws_create_new_server_wsi(struct lws_context *context)
                return NULL;
        }
 
+       new_wsi->tsi = n;
+       lwsl_info("Accepted %p to tsi %d\n", new_wsi, new_wsi->tsi);
+
        new_wsi->context = context;
        new_wsi->pending_timeout = NO_PENDING_TIMEOUT;
        new_wsi->rxflow_change_to = LWS_RXFLOW_ALLOW;
@@ -601,11 +642,6 @@ lws_create_new_server_wsi(struct lws_context *context)
        new_wsi->use_ssl = LWS_SSL_ENABLED(context);
 #endif
 
-       if (lws_allocate_header_table(new_wsi)) {
-               lws_free(new_wsi);
-               return NULL;
-       }
-
        /*
         * these can only be set once the protocol is known
         * we set an unestablished connection's protocol pointer
@@ -617,12 +653,13 @@ lws_create_new_server_wsi(struct lws_context *context)
        new_wsi->ietf_spec_revision = 0;
        new_wsi->sock = LWS_SOCK_INVALID;
 
+
        /*
         * outermost create notification for wsi
         * no user_space because no protocol selection
         */
-       context->protocols[0].callback(new_wsi, LWS_CALLBACK_WSI_CREATE, NULL,
-                                      NULL, 0);
+       context->protocols[0].callback(new_wsi, LWS_CALLBACK_WSI_CREATE,
+                                      NULL, NULL, 0);
 
        return new_wsi;
 }
@@ -642,7 +679,7 @@ lws_http_transaction_completed(struct lws *wsi)
        lwsl_debug("%s: wsi %p\n", __func__, wsi);
        /* if we can't go back to accept new headers, drop the connection */
        if (wsi->u.http.connection_type != HTTP_CONNECTION_KEEP_ALIVE) {
-               lwsl_info("%s: close connection\n", __func__);
+               lwsl_info("%s: %p: close connection\n", __func__, wsi);
                return 1;
        }
 
@@ -655,34 +692,75 @@ lws_http_transaction_completed(struct lws *wsi)
        lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
 
        if (lws_allocate_header_table(wsi))
-               return 1;
+               lwsl_info("On waiting list for header table");
 
        /* If we're (re)starting on headers, need other implied init */
        wsi->u.hdr.ues = URIES_IDLE;
 
-       lwsl_info("%s: keep-alive await new transaction\n", __func__);
+       lwsl_info("%s: %p: keep-alive await new transaction\n", __func__, wsi);
 
        return 0;
 }
 
-static int
-lws_get_idlest_tsi(struct lws_context *context)
+/*
+ * either returns new wsi bound to accept_fd, or closes accept_fd and
+ * returns NULL, having cleaned up any new wsi pieces
+ */
+
+LWS_VISIBLE struct lws *
+lws_adopt_socket(struct lws_context *context, lws_sockfd_type accept_fd)
 {
-       unsigned int lowest = ~0;
-       int n, hit = 0;
+       struct lws *new_wsi = lws_create_new_server_wsi(context);
+       if (!new_wsi) {
+               compatible_close(accept_fd);
+               return NULL;
+       }
 
-       for (n = 0; n < context->count_threads; n++)
-               if ((unsigned int)context->pt[n].fds_count < lowest) {
-                       lowest = context->pt[n].fds_count;
-                       hit = n;
-               }
+       new_wsi->sock = accept_fd;
 
-       return hit;
+       /* the transport is accepted... give him time to negotiate */
+       lws_set_timeout(new_wsi, PENDING_TIMEOUT_ESTABLISH_WITH_SERVER,
+                       AWAITING_TIMEOUT);
+
+#if LWS_POSIX == 0
+       mbed3_tcp_stream_accept(accept_fd, new_wsi);
+#endif
+
+       /*
+        * A new connection was accepted. Give the user a chance to
+        * set properties of the newly created wsi. There's no protocol
+        * selected yet so we issue this to protocols[0]
+        */
+       if ((context->protocols[0].callback)(new_wsi,
+            LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED, NULL, NULL, 0)) {
+               compatible_close(new_wsi->sock);
+               lws_free(new_wsi);
+               return NULL;
+       }
+
+       lws_libev_accept(new_wsi, new_wsi->sock);
+
+       if (!LWS_SSL_ENABLED(context)) {
+               if (insert_wsi_socket_into_fds(context, new_wsi))
+                       goto fail;
+       } else {
+               new_wsi->mode = LWSCM_SSL_INIT;
+               if (lws_server_socket_service_ssl(new_wsi, accept_fd))
+                       goto fail;
+       }
+
+       return new_wsi;
+
+fail:
+       lwsl_err("%s: fail\n", __func__);
+       lws_close_free_wsi(new_wsi, LWS_CLOSE_STATUS_NOSTATUS);
+
+       return NULL;
 }
 
-LWS_VISIBLE
-int lws_server_socket_service(struct lws_context *context,
-                             struct lws *wsi, struct lws_pollfd *pollfd)
+LWS_VISIBLE int
+lws_server_socket_service(struct lws_context *context, struct lws *wsi,
+                         struct lws_pollfd *pollfd)
 {
        lws_sockfd_type accept_fd = LWS_SOCK_INVALID;
        struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
@@ -690,7 +768,7 @@ int lws_server_socket_service(struct lws_context *context,
        struct sockaddr_in cli_addr;
        socklen_t clilen;
 #endif
-       struct lws *new_wsi = NULL;
+
        int n, len;
 
        switch (wsi->mode) {
@@ -698,6 +776,7 @@ int lws_server_socket_service(struct lws_context *context,
        case LWSCM_HTTP_SERVING:
        case LWSCM_HTTP_SERVING_ACCEPTED:
        case LWSCM_HTTP2_SERVING:
+       case LWSS_SHUTDOWN:
 
                /* handle http headers coming in */
 
@@ -720,15 +799,19 @@ int lws_server_socket_service(struct lws_context *context,
 
                /* any incoming data ready? */
 
-               if (!(pollfd->revents & pollfd->events && LWS_POLLIN))
+               if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
                        goto try_pollout;
 
+               if (wsi->state == LWSS_HTTP && !wsi->u.hdr.ah)
+                       if (lws_allocate_header_table(wsi))
+                               goto try_pollout;
+
                len = lws_ssl_capable_read(wsi, pt->serv_buf,
                                           LWS_MAX_SOCKET_IO_BUF);
-               lwsl_debug("%s: read %d\r\n", __func__, len);
+               lwsl_debug("%s: wsi %p read %d\r\n", __func__, wsi, len);
                switch (len) {
                case 0:
-                       lwsl_info("lws_server_skt_srv: read 0 len\n");
+                       lwsl_info("%s: read 0 len\n", __func__);
                        /* lwsl_info("   state=%d\n", wsi->state); */
                        if (!wsi->hdr_parsing_completed)
                                lws_free_header_table(wsi);
@@ -748,7 +831,6 @@ int lws_server_socket_service(struct lws_context *context,
                        n = lws_read(wsi, pt->serv_buf, len);
                        if (n < 0) /* we closed wsi */
                                return 1;
-
                        /* hum he may have used up the
                         * writability above */
                        break;
@@ -761,8 +843,10 @@ try_pollout:
                        break;
 
                /* one shot */
-               if (lws_change_pollfd(wsi, LWS_POLLOUT, 0))
+               if (lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
+                       lwsl_notice("%s a\n", __func__);
                        goto fail;
+               }
 
                lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_WRITE);
 
@@ -771,15 +855,19 @@ try_pollout:
                                        wsi->protocol->callback,
                                        wsi, LWS_CALLBACK_HTTP_WRITEABLE,
                                        wsi->user_space, NULL, 0);
-                       if (n < 0)
+                       if (n < 0) {
+                               lwsl_info("writeable_fail\n");
                                goto fail;
+                       }
                        break;
                }
 
                /* >0 == completion, <0 == error */
                n = lws_serve_http_file_fragment(wsi);
-               if (n < 0 || (n > 0 && lws_http_transaction_completed(wsi)))
+               if (n < 0 || (n > 0 && lws_http_transaction_completed(wsi))) {
+                       lwsl_info("completed\n");
                        goto fail;
+               }
                break;
 
        case LWSCM_SERVER_LISTENER:
@@ -787,91 +875,65 @@ try_pollout:
 #if LWS_POSIX
                /* pollin means a client has connected to us then */
 
-               if (!(pollfd->revents & LWS_POLLIN))
-                       break;
+               do {
+                       if (!(pollfd->revents & LWS_POLLIN) || !(pollfd->events & LWS_POLLIN))
+                               break;
 
-               /* listen socket got an unencrypted connection... */
-
-               clilen = sizeof(cli_addr);
-               lws_latency_pre(context, wsi);
-               accept_fd  = accept(pollfd->fd, (struct sockaddr *)&cli_addr,
-                                   &clilen);
-               lws_latency(context, wsi,
-                       "unencrypted accept LWSCM_SERVER_LISTENER",
-                                                    accept_fd, accept_fd >= 0);
-               if (accept_fd < 0) {
-                       if (LWS_ERRNO == LWS_EAGAIN ||
-                           LWS_ERRNO == LWS_EWOULDBLOCK) {
-                               lwsl_debug("accept asks to try again\n");
+                       /* listen socket got an unencrypted connection... */
+
+                       clilen = sizeof(cli_addr);
+                       lws_latency_pre(context, wsi);
+                       accept_fd  = accept(pollfd->fd, (struct sockaddr *)&cli_addr,
+                                           &clilen);
+                       lws_latency(context, wsi, "listener accept", accept_fd,
+                                   accept_fd >= 0);
+                       if (accept_fd < 0) {
+                               if (LWS_ERRNO == LWS_EAGAIN ||
+                                   LWS_ERRNO == LWS_EWOULDBLOCK) {
+                                       lwsl_err("accept asks to try again\n");
+                                       break;
+                               }
+                               lwsl_err("ERROR on accept: %s\n", strerror(LWS_ERRNO));
                                break;
                        }
-                       lwsl_warn("ERROR on accept: %s\n", strerror(LWS_ERRNO));
-                       break;
-               }
-
-               lws_plat_set_socket_options(context, accept_fd);
-#else
-               /* not very beautiful... */
-               accept_fd = (lws_sockfd_type)pollfd;
-#endif
-               /*
-                * look at who we connected to and give user code a chance
-                * to reject based on client IP.  There's no protocol selected
-                * yet so we issue this to protocols[0]
-                */
-
-               if ((context->protocols[0].callback)(wsi,
-                               LWS_CALLBACK_FILTER_NETWORK_CONNECTION,
-                                          NULL, (void *)(long)accept_fd, 0)) {
-                       lwsl_debug("Callback denied network connection\n");
-                       compatible_close(accept_fd);
-                       break;
-               }
-
-               new_wsi = lws_create_new_server_wsi(context);
-               if (new_wsi == NULL) {
-                       compatible_close(accept_fd);
-                       break;
-               }
 
-               new_wsi->sock = accept_fd;
-               new_wsi->tsi = lws_get_idlest_tsi(context);
-               lwsl_info("Accepted to tsi %d\n", new_wsi->tsi);
+                       lws_plat_set_socket_options(context, accept_fd);
 
-               /* the transport is accepted... give him time to negotiate */
-               lws_set_timeout(new_wsi, PENDING_TIMEOUT_ESTABLISH_WITH_SERVER,
-                               AWAITING_TIMEOUT);
+                       lwsl_debug("accepted new conn  port %u on fd=%d\n",
+                                         ntohs(cli_addr.sin_port), accept_fd);
 
-#if LWS_POSIX == 0
-               mbed3_tcp_stream_accept(accept_fd, new_wsi);
+#else
+                       /* not very beautiful... */
+                       accept_fd = (lws_sockfd_type)pollfd;
 #endif
+                       /*
+                        * look at who we connected to and give user code a chance
+                        * to reject based on client IP.  There's no protocol selected
+                        * yet so we issue this to protocols[0]
+                        */
+                       if ((context->protocols[0].callback)(wsi,
+                                       LWS_CALLBACK_FILTER_NETWORK_CONNECTION,
+                                       NULL, (void *)(long)accept_fd, 0)) {
+                               lwsl_debug("Callback denied network connection\n");
+                               compatible_close(accept_fd);
+                               break;
+                       }
 
-               /*
-                * A new connection was accepted. Give the user a chance to
-                * set properties of the newly created wsi. There's no protocol
-                * selected yet so we issue this to protocols[0]
-                */
-               (context->protocols[0].callback)(new_wsi,
-                       LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED,
-                       NULL, NULL, 0);
-
-               lws_libev_accept(new_wsi, accept_fd);
+                       if (!lws_adopt_socket(context, accept_fd))
+                               /* already closed cleanly as necessary */
+                               return 1;
 
-               if (!LWS_SSL_ENABLED(context)) {
 #if LWS_POSIX
-                       lwsl_debug("accepted new conn  port %u on fd=%d\n",
-                                         ntohs(cli_addr.sin_port), accept_fd);
+               } while (pt->fds_count < context->fd_limit_per_thread - 1 &&
+                        lws_poll_listen_fd(&pt->fds[wsi->position_in_fds_table]) > 0);
 #endif
-                       if (insert_wsi_socket_into_fds(context, new_wsi))
-                               goto fail;
-               }
-               break;
+               return 0;
 
        default:
                break;
        }
 
-       if (!lws_server_socket_service_ssl(&wsi, new_wsi, accept_fd, pollfd))
+       if (!lws_server_socket_service_ssl(wsi, accept_fd))
                return 0;
 
 fail:
index 098170b..ef93055 100644 (file)
@@ -294,6 +294,9 @@ notify:
 int
 lws_service_timeout_check(struct lws *wsi, unsigned int sec)
 {
+       struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
+       struct lws **pwsi;
+
        /*
         * if extensions want in on it (eg, we are a mux parent)
         * give them a chance to service child timeouts
@@ -309,8 +312,25 @@ lws_service_timeout_check(struct lws *wsi, unsigned int sec)
         * connection
         */
        if ((time_t)sec > wsi->pending_timeout_limit) {
-               lwsl_info("wsi %p: TIMEDOUT WAITING on %d\n",
-                         (void *)wsi, wsi->pending_timeout);
+#if LWS_POSIX
+               lwsl_notice("wsi %p: TIMEDOUT WAITING on %d (did hdr %d, ah %p, wl %d, pfd events %d)\n",
+                           (void *)wsi, wsi->pending_timeout,
+                           wsi->hdr_parsing_completed, wsi->u.hdr.ah,
+                           pt->ah_wait_list_length,
+                           pt->fds[wsi->sock].events);
+#endif
+               lws_pt_lock(pt);
+
+               pwsi = &pt->ah_wait_list;
+               while (*pwsi) {
+                       if (*pwsi == wsi)
+                               break;
+                       pwsi = &(*pwsi)->u.hdr.ah_wait_list;
+               }
+               lws_pt_unlock(pt);
+
+               if (!*pwsi)
+                       lwsl_err("*** not on ah wait list ***\n");
                /*
                 * Since he failed a timeout, he already had a chance to do
                 * something and was unable to... that includes situations like
@@ -372,9 +392,6 @@ int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len)
 LWS_VISIBLE int
 lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int tsi)
 {
-#if LWS_POSIX
-       int idx = 0;
-#endif
        struct lws_context_per_thread *pt = &context->pt[tsi];
        lws_sockfd_type our_fd = 0, tmp_fd;
        struct lws_tokens eff_buf;
@@ -386,10 +403,6 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
        int n, m;
        int more;
 
-#if LWS_POSIX
-       if (context->lserv_fd)
-               idx = wsi_from_fd(context, context->lserv_fd)->position_in_fds_table;
-#endif
        /*
         * you can call us with pollfd = NULL to just allow the once-per-second
         * global timeout checks; if less than a second since the last check
@@ -409,7 +422,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
                if (pollfd)
                        our_fd = pollfd->fd;
 
-               wsi = context->timeout_list;
+               wsi = context->pt[tsi].timeout_list;
                while (wsi) {
                        /* we have to take copies, because he may be deleted */
                        wsi1 = wsi->timeout_list;
@@ -423,6 +436,18 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
                        }
                        wsi = wsi1;
                }
+#if 1
+               {
+                       char s[300], *p = s;
+
+                       for (n = 0; n < context->count_threads; n++)
+                               p += sprintf(p, " %7lu (%5d), ",
+                                            context->pt[n].count_conns,
+                                            context->pt[n].fds_count);
+
+                       lwsl_notice("load: %s\n", s);
+               }
+#endif
        }
 
        /* the socket we came to service timed out, nothing to do */
@@ -445,44 +470,10 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
         */
 
 #if LWS_POSIX
-       /*
-        * deal with listen service piggybacking
-        * every lserv_mod services of other fds, we
-        * sneak one in to service the listen socket if there's anything waiting
-        *
-        * To handle connection storms, as found in ab, if we previously saw a
-        * pending connection here, it causes us to check again next time.
-        */
-
-       if (context->lserv_fd && pollfd != &pt->fds[idx]) {
-               context->lserv_count++;
-               if (context->lserv_seen ||
-                   context->lserv_count == context->lserv_mod) {
-                       context->lserv_count = 0;
-                       m = 1;
-                       if (context->lserv_seen > 5)
-                               m = 2;
-                       while (m--) {
-                               /*
-                                * even with extpoll, we prepared this
-                                * internal fds for listen
-                                */
-                               n = lws_poll_listen_fd(&pt->fds[idx]);
-                               if (n <= 0) {
-                                       if (context->lserv_seen)
-                                               context->lserv_seen--;
-                                       break;
-                               }
-                               /* there's a conn waiting for us */
-                               lws_service_fd(context, &pt->fds[idx]);
-                               context->lserv_seen++;
-                       }
-               }
-       }
 
        /* handle session socket closed */
 
-       if ((!(pollfd->revents & LWS_POLLIN)) &&
+       if ((!(pollfd->revents & pollfd->events & LWS_POLLIN)) &&
            (pollfd->revents & LWS_POLLHUP)) {
 
                lwsl_debug("Session Socket %p (fd=%d) dead\n",
index 3b66ccf..bb9613a 100644 (file)
--- a/lib/ssl.c
+++ b/lib/ssl.c
@@ -592,11 +592,8 @@ lws_ssl_close(struct lws *wsi)
 /* leave all wsi close processing to the caller */
 
 LWS_VISIBLE int
-lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi,
-                             lws_sockfd_type accept_fd,
-                             struct lws_pollfd *pollfd)
+lws_server_socket_service_ssl(struct lws *wsi, lws_sockfd_type accept_fd)
 {
-       struct lws *wsi = *pwsi;
        struct lws_context *context = wsi->context;
        struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
        int n, m;
@@ -606,43 +603,41 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi,
 
        if (!LWS_SSL_ENABLED(context))
                return 0;
-
+lwsl_err("%s: mode %d, state %d\n", __func__, wsi->mode, wsi->state);
        switch (wsi->mode) {
-       case LWSCM_SERVER_LISTENER:
+       case LWSCM_SSL_INIT:
 
-               if (!new_wsi) {
-                       lwsl_err("no new_wsi\n");
+               if (!wsi)
                        return 0;
-               }
 
-               new_wsi->ssl = SSL_new(context->ssl_ctx);
-               if (new_wsi->ssl == NULL) {
+               wsi->ssl = SSL_new(context->ssl_ctx);
+               if (wsi->ssl == NULL) {
                        lwsl_err("SSL_new failed: %s\n",
-                                ERR_error_string(SSL_get_error(new_wsi->ssl, 0), NULL));
+                                ERR_error_string(SSL_get_error(wsi->ssl, 0), NULL));
                        lws_decode_ssl_error();
                        compatible_close(accept_fd);
                        goto fail;
                }
 
-               SSL_set_ex_data(new_wsi->ssl,
+               SSL_set_ex_data(wsi->ssl,
                        openssl_websocket_private_data_index, context);
 
-               SSL_set_fd(new_wsi->ssl, accept_fd);
+               SSL_set_fd(wsi->ssl, accept_fd);
 
 #ifdef USE_WOLFSSL
 #ifdef USE_OLD_CYASSL
-               CyaSSL_set_using_nonblock(new_wsi->ssl, 1);
+               CyaSSL_set_using_nonblock(wsi->ssl, 1);
 #else
-               wolfSSL_set_using_nonblock(new_wsi->ssl, 1);
+               wolfSSL_set_using_nonblock(wsi->ssl, 1);
 #endif
 #else
-               SSL_set_mode(new_wsi->ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
-               bio = SSL_get_rbio(new_wsi->ssl);
+               SSL_set_mode(wsi->ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
+               bio = SSL_get_rbio(wsi->ssl);
                if (bio)
                        BIO_set_nbio(bio, 1); /* nonblocking */
                else
                        lwsl_notice("NULL rbio\n");
-               bio = SSL_get_wbio(new_wsi->ssl);
+               bio = SSL_get_wbio(wsi->ssl);
                if (bio)
                        BIO_set_nbio(bio, 1); /* nonblocking */
                else
@@ -655,8 +650,6 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi,
                 * pieces come if we're not sorted yet
                 */
 
-               *pwsi = new_wsi;
-               wsi = *pwsi;
                wsi->mode = LWSCM_SSL_ACK_PENDING;
                if (insert_wsi_socket_into_fds(context, wsi))
                        goto fail;
@@ -754,7 +747,7 @@ go_again:
                        break;
                }
                lwsl_debug("SSL_accept failed skt %u: %s\n",
-                          pollfd->fd, ERR_error_string(m, NULL));
+                          wsi->sock, ERR_error_string(m, NULL));
                goto fail;
 
 accepted:
index 4c93496..7fd421e 100644 (file)
@@ -35,6 +35,8 @@
  *                             using this protocol, including the sender
  */
 
+extern int debug_level;
+
 enum demo_protocols {
        /* always first */
        PROTOCOL_HTTP = 0,
@@ -117,8 +119,8 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
 {
        struct per_session_data__http *pss =
                        (struct per_session_data__http *)user;
-       static unsigned char buffer[4096];
-       unsigned long amount, file_len;
+       unsigned char buffer[4096 + LWS_PRE];
+       unsigned long amount, file_len, sent;
        char leaf_path[1024];
        const char *mimetype;
        char *other_headers;
@@ -136,15 +138,16 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
        switch (reason) {
        case LWS_CALLBACK_HTTP:
 
-               dump_handshake_info(wsi);
+               if (debug_level & LLL_INFO) {
+                       dump_handshake_info(wsi);
 
-               /* dump the individual URI Arg parameters */
-               n = 0;
-               while (lws_hdr_copy_fragment(wsi, buf, sizeof(buf),
-                                            WSI_TOKEN_HTTP_URI_ARGS, n) > 0) {
-                       lwsl_info("URI Arg %d: %s\n", ++n, buf);
+                       /* dump the individual URI Arg parameters */
+                       n = 0;
+                       while (lws_hdr_copy_fragment(wsi, buf, sizeof(buf),
+                                                    WSI_TOKEN_HTTP_URI_ARGS, n) > 0) {
+                               lwsl_info("URI Arg %d: %s\n", ++n, buf);
+                       }
                }
-
                if (len < 1) {
                        lws_return_http_status(wsi,
                                                HTTP_STATUS_BAD_REQUEST, NULL);
@@ -153,8 +156,7 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
 
                /* this example server has no concept of directories */
                if (strchr((const char *)in + 1, '/')) {
-                       lws_return_http_status(wsi,
-                                              HTTP_STATUS_FORBIDDEN, NULL);
+                       lws_return_http_status(wsi, HTTP_STATUS_FORBIDDEN, NULL);
                        goto try_to_reuse;
                }
 
@@ -177,8 +179,10 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
                        pss->fd = lws_plat_file_open(wsi, leaf_path, &file_len,
                                                     LWS_O_RDONLY);
 
-                       if (pss->fd == LWS_INVALID_FILE)
+                       if (pss->fd == LWS_INVALID_FILE) {
+                               lwsl_err("faild to open file %s\n", leaf_path);
                                return -1;
+                       }
 
                        /*
                         * we will send a big jpeg file, but it could be
@@ -219,9 +223,11 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
                         * this is mandated by changes in HTTP2
                         */
 
+                       *p = '\0';
+                       lwsl_info("%s\n", buffer + LWS_PRE);
+
                        n = lws_write(wsi, buffer + LWS_PRE, p - (buffer + LWS_PRE),
                                      LWS_WRITE_HTTP_HEADERS);
-
                        if (n < 0) {
                                lws_plat_file_close(wsi, pss->fd);
                                return -1;
@@ -284,7 +290,6 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
                 * we'll get a LWS_CALLBACK_HTTP_FILE_COMPLETION callback when
                 * it's done
                 */
-
                break;
 
        case LWS_CALLBACK_HTTP_BODY:
@@ -308,9 +313,15 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
                goto try_to_reuse;
 
        case LWS_CALLBACK_HTTP_WRITEABLE:
+               lwsl_info("LWS_CALLBACK_HTTP_WRITEABLE\n");
+
+               if (pss->fd == LWS_INVALID_FILE)
+                       goto try_to_reuse;
+
                /*
                 * we can send more of whatever it is we were sending
                 */
+               sent = 0;
                do {
                        /* we'd like the send this much */
                        n = sizeof(buffer) - LWS_PRE;
@@ -328,15 +339,16 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
                                n = m;
 
                        n = lws_plat_file_read(wsi, pss->fd,
-                                              &amount, buffer +
-                                               LWS_PRE, n);
+                                              &amount, buffer + LWS_PRE, n);
                        /* problem reading, close conn */
-                       if (n < 0)
+                       if (n < 0) {
+                               lwsl_err("problem reading file\n");
                                goto bail;
+                       }
                        n = (int)amount;
                        /* sent it all, close conn */
                        if (n == 0)
-                               goto flush_bail;
+                               goto penultimate;
                        /*
                         * To support HTTP2, must take care about preamble space
                         *
@@ -344,46 +356,28 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
                         * is handled by the library itself if you sent a
                         * content-length header
                         */
-                       m = lws_write(wsi, buffer + LWS_PRE,
-                                     n, LWS_WRITE_HTTP);
-                       if (m < 0)
+                       m = lws_write(wsi, buffer + LWS_PRE, n, LWS_WRITE_HTTP);
+                       if (m < 0) {
+                               lwsl_err("write failed\n");
                                /* write failed, close conn */
                                goto bail;
-
-                       /*
-                        * http2 won't do this
-                        */
-                       if (m != n)
-                               /* partial write, adjust */
-                               if (lws_plat_file_seek_cur(wsi, pss->fd, m - n) ==
-                                                            (unsigned long)-1)
-                                       goto bail;
-
+                       }
                        if (m) /* while still active, extend timeout */
-                               lws_set_timeout(wsi,
-                                               PENDING_TIMEOUT_HTTP_CONTENT, 5);
-
-                       /* if we have indigestion, let him clear it
-                        * before eating more */
-                       if (lws_partial_buffered(wsi))
-                               break;
-
-               } while (!lws_send_pipe_choked(wsi));
+                               lws_set_timeout(wsi, PENDING_TIMEOUT_HTTP_CONTENT, 5);
+                       sent += m;
 
+               } while (!lws_send_pipe_choked(wsi) && (sent < 1024 * 1024));
 later:
                lws_callback_on_writable(wsi);
                break;
-flush_bail:
-               /* true if still partial pending */
-               if (lws_partial_buffered(wsi)) {
-                       lws_callback_on_writable(wsi);
-                       break;
-               }
+penultimate:
                lws_plat_file_close(wsi, pss->fd);
+               pss->fd = LWS_INVALID_FILE;
                goto try_to_reuse;
 
 bail:
                lws_plat_file_close(wsi, pss->fd);
+
                return -1;
 
        /*
index de9e12e..8cb1392 100644 (file)
@@ -23,6 +23,7 @@
 
 int close_testing;
 int max_poll_elements;
+int debug_level = 7;
 volatile int force_exit = 0;
 struct lws_context *context;
 struct lws_plat_file_ops fops_plat;
@@ -192,7 +193,6 @@ int main(int argc, char **argv)
        ev_timer timeout_watcher;
        char cert_path[1024];
        char key_path[1024];
-       int debug_level = 7;
        int use_ssl = 0;
        int opts = 0;
        int n = 0;
index 365333d..274c53b 100644 (file)
@@ -22,7 +22,7 @@
 
 /* lws-mirror_protocol */
 
-#define MAX_MESSAGE_QUEUE 32
+#define MAX_MESSAGE_QUEUE 512
 
 struct a_message {
        void *payload;
@@ -38,7 +38,7 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
 {
        struct per_session_data__lws_mirror *pss =
                        (struct per_session_data__lws_mirror *)user;
-       int n;
+       int n, m;
 
        switch (reason) {
 
@@ -59,19 +59,16 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
                if (close_testing)
                        break;
                while (pss->ringbuffer_tail != ringbuffer_head) {
-
+                       m = ringbuffer[pss->ringbuffer_tail].len;
                        n = lws_write(wsi, (unsigned char *)
                                   ringbuffer[pss->ringbuffer_tail].payload +
-                                  LWS_PRE,
-                                  ringbuffer[pss->ringbuffer_tail].len,
-                                                               LWS_WRITE_TEXT);
+                                  LWS_PRE, m, LWS_WRITE_TEXT);
                        if (n < 0) {
                                lwsl_err("ERROR %d writing to mirror socket\n", n);
                                return -1;
                        }
-                       if (n < (int)ringbuffer[pss->ringbuffer_tail].len)
-                               lwsl_err("mirror partial write %d vs %d\n",
-                                      n, ringbuffer[pss->ringbuffer_tail].len);
+                       if (n < m)
+                               lwsl_err("mirror partial write %d vs %d\n", n, m);
 
                        if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
                                pss->ringbuffer_tail = 0;
@@ -83,8 +80,7 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
                                lws_rx_flow_allow_all_protocol(lws_get_context(wsi),
                                               lws_get_protocol(wsi));
 
-                       if (lws_partial_buffered(wsi) ||
-                           lws_send_pipe_choked(wsi)) {
+                       if (lws_send_pipe_choked(wsi)) {
                                lws_callback_on_writable(wsi);
                                break;
                        }
@@ -101,11 +97,10 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
                if (ringbuffer[ringbuffer_head].payload)
                        free(ringbuffer[ringbuffer_head].payload);
 
-               ringbuffer[ringbuffer_head].payload =
-                               malloc(LWS_PRE + len);
+               ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
                ringbuffer[ringbuffer_head].len = len;
                memcpy((char *)ringbuffer[ringbuffer_head].payload +
-                                         LWS_PRE, in, len);
+                      LWS_PRE, in, len);
                if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
                        ringbuffer_head = 0;
                else
index 317e432..af5bfc1 100644 (file)
@@ -24,6 +24,7 @@
 
 int close_testing;
 int max_poll_elements;
+int debug_level = 7;
 
 #ifdef EXTERNAL_POLL
 struct lws_pollfd *pollfds;
@@ -185,7 +186,6 @@ int main(int argc, char **argv)
        pthread_t pthread_dumb, pthread_service[32];
        char cert_path[1024];
        char key_path[1024];
-       int debug_level = 7;
        int threads = 1;
        int use_ssl = 0;
        void *retval;
@@ -335,6 +335,7 @@ int main(int argc, char **argv)
        info.options = opts;
        info.count_threads = threads;
        info.extensions = exts;
+       info.max_http_header_pool = 4;
 
        context = lws_create_context(&info);
        if (context == NULL) {
index cd31082..5679459 100644 (file)
@@ -23,6 +23,7 @@
 
 int close_testing;
 int max_poll_elements;
+int debug_level = 7;
 
 #ifdef EXTERNAL_POLL
 struct lws_pollfd *pollfds;
@@ -171,7 +172,6 @@ int main(int argc, char **argv)
        const char *iface = NULL;
        char cert_path[1024];
        char key_path[1024];
-       int debug_level = 7;
        int use_ssl = 0;
        int opts = 0;
        int n = 0;