multithreaded service
authorAndy Green <andy.green@linaro.org>
Mon, 18 Jan 2016 19:34:24 +0000 (03:34 +0800)
committerAndy Green <andy.green@linaro.org>
Tue, 19 Jan 2016 12:02:36 +0000 (20:02 +0800)
This adds support for multithreaded service to lws without adding any
threading or locking code in the library.

At context creation time you can request split the service part of the
context into n service domains, which are load-balanced so that the most
idle one gets the next listen socket accept.

There's a single listen socket on one port still.

User code may then spawn n threads doing n service loops / poll()s
simultaneously.  Locking is only required (I think) in the existing
FD lock callbacks already handled by the pthreads server example,
and that locking takes place in user code.  So the library remains
completely agnostic about the threading / locking scheme.

And by default, it's completely compatible with one service thread
so no changes are required by people uninterested in multithreaded
service.

However for people interested in extremely lightweight mass http[s]/
ws[s] service with minimum provisioning, the library can now do
everything out of the box.

To test it, just try

$ libwebsockets-test-server-pthreads -j 8

where -j controls the number of service threads

Signed-off-by: Andy Green <andy.green@linaro.org>
23 files changed:
CMakeLists.txt
README.build.md
README.coding.md
changelog
lib/client-handshake.c
lib/client-parser.c
lib/client.c
lib/context.c
lib/header.c
lib/libwebsockets.c
lib/libwebsockets.h
lib/lws-plat-unix.c
lib/lws-plat-win.c
lib/output.c
lib/parsers.c
lib/pollfd.c
lib/private-libwebsockets.h
lib/server-handshake.c
lib/server.c
lib/service.c
lib/ssl.c
lws_config.h.in
test-server/test-server-pthreads.c

index 61468b0..adb563b 100644 (file)
@@ -76,19 +76,6 @@ option(LWS_WITH_HTTP2 "Compile with support for http2" OFF)
 option(LWS_MBED3 "Platform is MBED3" OFF)
 option(LWS_SSL_SERVER_WITH_ECDH_CERT "Include SSL server use ECDH certificate" OFF)
 
-
-if (DEFINED YOTTA_WEBSOCKETS_VERSION_STRING)
-
-set(LWS_WITH_SHARED OFF)
-set(LWS_WITH_SSL OFF)
-set(LWS_WITH_ZLIB OFF)
-set(LWS_WITHOUT_CLIENT ON)
-set(LWS_WITHOUT_TESTAPPS ON)
-set(LWS_WITHOUT_EXTENSIONS ON)
-set(LWS_MBED3 ON)
-
-endif()
-
 if (DEFINED YOTTA_WEBSOCKETS_VERSION_STRING)
 
 set(LWS_WITH_SHARED OFF)
@@ -98,6 +85,7 @@ set(LWS_WITHOUT_CLIENT ON)
 set(LWS_WITHOUT_TESTAPPS ON)
 set(LWS_WITHOUT_EXTENSIONS ON)
 set(LWS_MBED3 ON)
+set(LWS_MAX_SMP 1)
 
 endif()
 
@@ -247,6 +235,10 @@ if (LWS_WITH_HTTP2)
        set(LWS_USE_HTTP2 1)
 endif()
 
+if ("${LWS_MAX_SMP}" STREQUAL "")
+       set(LWS_MAX_SMP 32)
+endif()
+
 #if (LWS_MBED3)
 #      set(CMAKE_C_FLAGS "-D_DEBUG ${CMAKE_C_FLAGS}")
 #endif()
@@ -1140,6 +1132,7 @@ message(" LWS_IPV6 = ${LWS_IPV6}")
 message(" LWS_WITH_HTTP2 = ${LWS_WITH_HTTP2}")
 message(" LWS_MBED3 = ${LWS_MBED3}")
 message(" LWS_SSL_SERVER_WITH_ECDH_CERT = ${LWS_SSL_SERVER_WITH_ECDH_CERT}")
+message(" LWS_MAX_SMP = ${LWS_MAX_SMP}")
 message("---------------------------------------------------------------------")
 
 # These will be available to parent projects including libwebsockets using add_subdirectory()
index b9bec01..95a28e0 100644 (file)
@@ -86,6 +86,14 @@ Building on Unix:
        $ LD_LIBRARY_PATH=/usr/local/ssl/lib libwebsockets-test-server --ssl
     ```
 
+       **NOTE5**:
+       To build with debug info and _DEBUG for lower priority debug messages
+       compiled in, use
+
+    ```bash
+       $ cmake .. -DCMAKE_BUILD_TYPE=DEBUG
+    ````
+
 4. Finally you can build using the generated Makefile:
 
     ```bash
index 4775cd5..d425bac 100644 (file)
@@ -330,5 +330,51 @@ LWS_SERVER_OPTION_SSL_ECD
 
 to build in support and select it at runtime.
 
+SMP / Multithreaded service
+---------------------------
 
+SMP support is integrated into LWS without any internal threading.  It's
+very simple to use, libwebsockets-test-server-pthread shows how to do it,
+use -j <n> argument there to control the number of service threads up to 32.
+
+Two new members are added to the info struct
+
+       unsigned int count_threads;
+       unsigned int fd_limit_per_thread;
+       
+leave them at the default 0 to get the normal singlethreaded service loop.
+
+Set count_threads to n to tell lws you will have n simultaneous service threads
+operating on the context.
+
+There is still a single listen socket on one port, no matter how many
+service threads.
+
+When a connection is made, it is accepted by the service thread with the least
+connections active to perform load balancing.
+
+The user code is responsible for spawning n threads running the service loop
+associated to a specific tsi (Thread Service Index, 0 .. n - 1).  See
+the libwebsockets-test-server-pthread for how to do.
+
+If you leave fd_limit_per_thread at 0, then the process limit of fds is shared
+between the service threads; if you process was allowed 1024 fds overall then
+each thread is limited to 1024 / n.
+
+You can set fd_limit_per_thread to a nonzero number to control this manually, eg
+the overall supported fd limit is less than the process allowance.
+
+You can control the context basic data allocation for multithreading from Cmake
+using -DLWS_MAX_SMP=, if not given it's set to 32.  The serv_buf allocation
+for the threads (currently 4096) is made at runtime only for active threads.
+
+Because lws will limit the requested number of actual threads supported
+according to LWS_MAX_SMP, there is an api lws_get_count_threads(context) to
+discover how many threads were actually allowed when the context was created.
+
+It's required to implement locking in the user code in the same way that
+libwebsockets-test-server-pthread does it, for the FD locking callbacks.
+
+There is no knowledge or dependency in lws itself about pthreads.  How the
+locking is implemented is entirely up to the user code.
 
index 9a34696..87a6942 100644 (file)
--- a/changelog
+++ b/changelog
@@ -164,7 +164,52 @@ to build in support and select it at runtime.
 
 6) There's a new api lws_parse_uri() that simplies chopping up
 https://xxx:yyy/zzz uris into parts nicely.  The test client now uses this
-to allow proper uris.
+to allow proper uris as well as the old address style.
+
+7) SMP support is integrated into LWS without any internal threading.  It's
+very simple to use, libwebsockets-test-server-pthread shows how to do it,
+use -j <n> argument there to control the number of service threads up to 32.
+
+Two new members are added to the info struct
+
+       unsigned int count_threads;
+       unsigned int fd_limit_per_thread;
+       
+leave them at the default 0 to get the normal singlethreaded service loop.
+
+Set count_threads to n to tell lws you will have n simultaneous service threads
+operating on the context.
+
+There is still a single listen socket on one port, no matter how many
+service threads.
+
+When a connection is made, it is accepted by the service thread with the least
+connections active to perform load balancing.
+
+The user code is responsible for spawning n threads running the service loop
+associated to a specific tsi (Thread Service Index, 0 .. n - 1).  See
+the libwebsockets-test-server-pthread for how to do.
+
+If you leave fd_limit_per_thread at 0, then the process limit of fds is shared
+between the service threads; if you process was allowed 1024 fds overall then
+each thread is limited to 1024 / n.
+
+You can set fd_limit_per_thread to a nonzero number to control this manually, eg
+the overall supported fd limit is less than the process allowance.
+
+You can control the context basic data allocation for multithreading from Cmake
+using -DLWS_MAX_SMP=, if not given it's set to 32.  The serv_buf allocation
+for the threads (currently 4096) is made at runtime only for active threads.
+
+Because lws will limit the requested number of actual threads supported
+according to LWS_MAX_SMP, there is an api lws_get_count_threads(context) to
+discover how many threads were actually allowed when the context was created.
+
+It's required to implement locking in the user code in the same way that
+libwebsockets-test-server-pthread does it, for the FD locking callbacks.
+
+There is no knowledge or dependency in lws itself about pthreads.  How the
+locking is implemented is entirely up to the user code.
 
 
 User api changes
index f8e4c81..6e03501 100644 (file)
@@ -9,6 +9,7 @@ lws_client_connect_2(struct lws *wsi)
        struct addrinfo hints, *result;
 #endif
        struct lws_context *context = wsi->context;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
        struct sockaddr_in server_addr4;
        struct sockaddr_in client_addr4;
        struct lws_pollfd pfd;
@@ -21,18 +22,18 @@ lws_client_connect_2(struct lws *wsi)
        /* proxy? */
 
        if (context->http_proxy_port) {
-               plen = sprintf((char *)context->serv_buf,
+               plen = sprintf((char *)pt->serv_buf,
                        "CONNECT %s:%u HTTP/1.0\x0d\x0a"
                        "User-agent: libwebsockets\x0d\x0a",
                        lws_hdr_simple_ptr(wsi, _WSI_TOKEN_CLIENT_PEER_ADDRESS),
                        wsi->u.hdr.ah->c_port);
 
                if (context->proxy_basic_auth_token[0])
-                       plen += sprintf((char *)context->serv_buf + plen,
+                       plen += sprintf((char *)pt->serv_buf + plen,
                                        "Proxy-authorization: basic %s\x0d\x0a",
                                        context->proxy_basic_auth_token);
 
-               plen += sprintf((char *)context->serv_buf + plen,
+               plen += sprintf((char *)pt->serv_buf + plen,
                                "\x0d\x0a");
 
                ads = context->http_proxy_address;
@@ -262,7 +263,7 @@ lws_client_connect_2(struct lws *wsi)
                        goto failed;
                wsi->u.hdr.ah->c_port = context->http_proxy_port;
 
-               n = send(wsi->sock, (char *)context->serv_buf, plen,
+               n = send(wsi->sock, (char *)pt->serv_buf, plen,
                         MSG_NOSIGNAL);
                if (n < 0) {
                        lwsl_debug("ERROR writing to proxy socket\n");
index ad8d58d..1ae473f 100644 (file)
@@ -23,6 +23,7 @@
 
 int lws_client_rx_sm(struct lws *wsi, unsigned char c)
 {
+       struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
        int callback_action = LWS_CALLBACK_CLIENT_RECEIVE;
        unsigned short close_code;
        unsigned char *pp;
@@ -31,7 +32,7 @@ int lws_client_rx_sm(struct lws *wsi, unsigned char c)
 
 
        if (wsi->u.ws.rx_draining_ext) {
-               struct lws **w = &wsi->context->rx_draining_ext_list;
+               struct lws **w = &pt->rx_draining_ext_list;
                lwsl_ext("%s: RX EXT DRAINING: Removing from list\n", __func__, c);
                assert(!c);
                eff_buf.token = NULL;
@@ -536,8 +537,8 @@ utf8_fail:                  lwsl_info("utf8 error\n");
                         * last chunk
                         */
                        wsi->u.ws.rx_draining_ext = 1;
-                       wsi->u.ws.rx_draining_ext_list = wsi->context->rx_draining_ext_list;
-                       wsi->context->rx_draining_ext_list = wsi;
+                       wsi->u.ws.rx_draining_ext_list = pt->rx_draining_ext_list;
+                       pt->rx_draining_ext_list = wsi;
                        lwsl_ext("%s: RX EXT DRAINING: Adding to list\n", __func__);
                }
                if (wsi->state == LWSS_RETURNED_CLOSE_ALREADY ||
index 9424ed3..a308487 100644 (file)
@@ -68,7 +68,9 @@ int lws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len)
 int lws_client_socket_service(struct lws_context *context,
                              struct lws *wsi, struct lws_pollfd *pollfd)
 {
-       char *p = (char *)&context->serv_buf[0];
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+       char *p = (char *)&pt->serv_buf[0];
+       char *sb = p;
        unsigned char c;
        int n, len;
 
@@ -103,8 +105,7 @@ int lws_client_socket_service(struct lws_context *context,
                        return 0;
                }
 
-               n = recv(wsi->sock, (char *)context->serv_buf,
-                                       sizeof(context->serv_buf), 0);
+               n = recv(wsi->sock, sb, LWS_MAX_SOCKET_IO_BUF, 0);
                if (n < 0) {
 
                        if (LWS_ERRNO == LWS_EAGAIN) {
@@ -117,12 +118,12 @@ int lws_client_socket_service(struct lws_context *context,
                        return 0;
                }
 
-               context->serv_buf[13] = '\0';
-               if (strcmp((char *)context->serv_buf, "HTTP/1.0 200 ") &&
-                   strcmp((char *)context->serv_buf, "HTTP/1.1 200 ")
+               pt->serv_buf[13] = '\0';
+               if (strcmp(sb, "HTTP/1.0 200 ") &&
+                   strcmp(sb, "HTTP/1.1 200 ")
                ) {
                        lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS);
-                       lwsl_err("ERROR proxy: %s\n", context->serv_buf);
+                       lwsl_err("ERROR proxy: %s\n", sb);
                        return 0;
                }
 
@@ -269,9 +270,7 @@ some_wait:
                                n = ERR_get_error();
                                if (n != SSL_ERROR_NONE) {
                                        lwsl_err("SSL connect error %lu: %s\n",
-                                               n,
-                                               ERR_error_string(n,
-                                                         (char *)context->serv_buf));
+                                               n, ERR_error_string(n, sb));
                                        return 0;
                                }
                        }
@@ -327,8 +326,7 @@ some_wait:
                                        n = ERR_get_error();
                                        if (n != SSL_ERROR_NONE) {
                                                lwsl_err("SSL connect error %lu: %s\n",
-                                                        n, ERR_error_string(n,
-                                                        (char *)context->serv_buf));
+                                                        n, ERR_error_string(n, sb));
                                                return 0;
                                        }
                                }
@@ -351,7 +349,7 @@ some_wait:
                                        lwsl_notice("accepting self-signed certificate\n");
                                } else {
                                        lwsl_err("server's cert didn't look good, X509_V_ERR = %d: %s\n",
-                                                n, ERR_error_string(n, (char *)context->serv_buf));
+                                                n, ERR_error_string(n, sb));
                                        lws_close_free_wsi(wsi,
                                                LWS_CLOSE_STATUS_NOSTATUS);
                                        return 0;
@@ -380,10 +378,9 @@ some_wait:
 
                lws_latency_pre(context, wsi);
 
-               n = lws_ssl_capable_write(wsi, context->serv_buf,
-                                         p - (char *)context->serv_buf);
+               n = lws_ssl_capable_write(wsi, (unsigned char *)sb, p - sb);
                lws_latency(context, wsi, "send lws_issue_raw", n,
-                           n == p - (char *)context->serv_buf);
+                           n == p - sb);
                switch (n) {
                case LWS_SSL_CAPABLE_ERROR:
                        lwsl_debug("ERROR writing to client socket\n");
@@ -508,14 +505,16 @@ strtolower(char *s)
 int
 lws_client_interpret_server_handshake(struct lws *wsi)
 {
+       int n, len, okay = 0, isErrorCodeReceived = 0, port = 0, ssl = 0;
        struct lws_context *context = wsi->context;
        int close_reason = LWS_CLOSE_STATUS_PROTOCOL_ERR;
-       int n, len, okay = 0, isErrorCodeReceived = 0, port = 0, ssl = 0;
        const char *pc, *prot, *ads = NULL, *path;
        char *p;
 #ifndef LWS_NO_EXTENSIONS
-       const struct lws_extension *ext;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+       char *sb = (char *)&pt->serv_buf[0];
        const struct lws_ext_options *opts;
+       const struct lws_extension *ext;
        char ext_name[128];
        const char *c, *a;
        char ignore;
@@ -662,13 +661,12 @@ check_extensions:
         * and go through matching them or identifying bogons
         */
 
-       if (lws_hdr_copy(wsi, (char *)context->serv_buf,
-                  sizeof(context->serv_buf), WSI_TOKEN_EXTENSIONS) < 0) {
+       if (lws_hdr_copy(wsi, sb, LWS_MAX_SOCKET_IO_BUF, WSI_TOKEN_EXTENSIONS) < 0) {
                lwsl_warn("ext list from server failed to copy\n");
                goto bail2;
        }
 
-       c = (char *)context->serv_buf;
+       c = sb;
        n = 0;
        ignore = 0;
        a = NULL;
@@ -1024,7 +1022,7 @@ lws_generate_client_handshake(struct lws *wsi, char *pkt)
        context->protocols[0].callback(wsi,
                                       LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER,
                                       NULL, &p,
-                                      (pkt + sizeof(context->serv_buf)) - p - 12);
+                                      (pkt + LWS_MAX_SOCKET_IO_BUF) - p - 12);
 
        p += sprintf(p, "\x0d\x0a");
 
index b66e098..02b3f1f 100644 (file)
@@ -96,15 +96,14 @@ lws_create_context(struct lws_context_creation_info *info)
 #endif
        lws_feature_status_libev(info);
 #endif
-       lwsl_info(" LWS_MAX_HEADER_LEN: %u\n", LWS_MAX_HEADER_LEN);
-       lwsl_info(" LWS_MAX_PROTOCOLS: %u\n", LWS_MAX_PROTOCOLS);
-
-       lwsl_info(" SPEC_LATEST_SUPPORTED: %u\n", SPEC_LATEST_SUPPORTED);
-       lwsl_info(" AWAITING_TIMEOUT: %u\n", AWAITING_TIMEOUT);
-       lwsl_info(" sizeof (*info): %u\n", sizeof(*info));
+       lwsl_info(" LWS_MAX_HEADER_LEN    : %u\n", LWS_MAX_HEADER_LEN);
+       lwsl_info(" LWS_MAX_PROTOCOLS     : %u\n", LWS_MAX_PROTOCOLS);
+       lwsl_info(" LWS_MAX_SMP           : %u\n", LWS_MAX_SMP);
+       lwsl_info(" SPEC_LATEST_SUPPORTED : %u\n", SPEC_LATEST_SUPPORTED);
+       lwsl_info(" AWAITING_TIMEOUT      : %u\n", AWAITING_TIMEOUT);
+       lwsl_info(" sizeof (*info)        : %u\n", sizeof(*info));
 #if LWS_POSIX
        lwsl_info(" SYSTEM_RANDOM_FILEPATH: '%s'\n", SYSTEM_RANDOM_FILEPATH);
-       lwsl_info(" LWS_MAX_ZLIB_CONN_BUFFER: %u\n", LWS_MAX_ZLIB_CONN_BUFFER);
 #endif
        if (lws_plat_context_early_init())
                return NULL;
@@ -120,6 +119,16 @@ lws_create_context(struct lws_context_creation_info *info)
                lwsl_notice(" Started with daemon pid %d\n", pid_daemon);
        }
 #endif
+       context->max_fds = getdtablesize();
+
+       if (info->count_threads)
+               context->count_threads = info->count_threads;
+       else
+               context->count_threads = 1;
+
+       if (context->count_threads > LWS_MAX_SMP)
+               context->count_threads = LWS_MAX_SMP;
+
        context->lserv_seen = 0;
        context->protocols = info->protocols;
        context->token_limits = info->token_limits;
@@ -132,6 +141,26 @@ lws_create_context(struct lws_context_creation_info *info)
        context->ka_interval = info->ka_interval;
        context->ka_probes = info->ka_probes;
 
+       /* we zalloc only the used ones, so the memory is not wasted
+        * allocating for unused threads
+        */
+       for (n = 0; n < context->count_threads; n++) {
+               context->pt[n].serv_buf = lws_zalloc(LWS_MAX_SOCKET_IO_BUF);
+               if (!context->pt[n].serv_buf) {
+                       lwsl_err("OOM\n");
+                       return NULL;
+               }
+       }
+
+       if (info->fd_limit_per_thread)
+               context->fd_limit_per_thread = info->fd_limit_per_thread;
+       else
+               context->fd_limit_per_thread = context->max_fds /
+                                              context->count_threads;
+
+       lwsl_notice(" Threads: %d each %d fds\n", context->count_threads,
+                   context->fd_limit_per_thread);
+
        memset(&wsi, 0, sizeof(wsi));
        wsi.context = context;
 
@@ -151,7 +180,12 @@ lws_create_context(struct lws_context_creation_info *info)
        context->lws_ev_sigint_cb = &lws_sigint_cb;
 #endif /* LWS_USE_LIBEV */
 
-       lwsl_info(" mem: context:         %5u bytes\n", sizeof(struct lws_context));
+       lwsl_info(" mem: context:         %5u bytes (%d + (%d x %d))\n",
+                 sizeof(struct lws_context) +
+                 (context->count_threads * LWS_MAX_SOCKET_IO_BUF),
+                 sizeof(struct lws_context),
+                 context->count_threads,
+                 LWS_MAX_SOCKET_IO_BUF);
 
        /*
         * allocate and initialize the pool of
@@ -181,21 +215,24 @@ lws_create_context(struct lws_context_creation_info *info)
 
        /* this is per context */
        lwsl_info(" mem: http hdr rsvd:   %5u bytes ((%u + %u) x %u)\n",
-                   (context->max_http_header_data + sizeof(struct allocated_headers)) *
+                   (context->max_http_header_data +
+                    sizeof(struct allocated_headers)) *
                    context->max_http_header_pool,
-                   context->max_http_header_data, sizeof(struct allocated_headers),
+                   context->max_http_header_data,
+                   sizeof(struct allocated_headers),
                    context->max_http_header_pool);
-
-       context->max_fds = getdtablesize();
-
-       context->fds = lws_zalloc(sizeof(struct lws_pollfd) * context->max_fds);
-       if (context->fds == NULL) {
+       n = sizeof(struct lws_pollfd) * context->count_threads *
+           context->fd_limit_per_thread;
+       context->pt[0].fds = lws_zalloc(n);
+       if (context->pt[0].fds == NULL) {
                lwsl_err("OOM allocating %d fds\n", context->max_fds);
                goto bail;
        }
-
-       lwsl_info(" mem: pollfd map:      %5u\n",
-                   sizeof(struct lws_pollfd) * context->max_fds);
+       lwsl_info(" mem: pollfd map:      %5u\n", n);
+       /* each thread serves his own chunk of fds */
+       for (n = 1; n < (int)info->count_threads; n++)
+               context->pt[n].fds = context->pt[0].fds +
+                                 (n * context->fd_limit_per_thread);
 
        if (lws_plat_init(context, info))
                goto bail;
@@ -289,7 +326,7 @@ lws_context_destroy(struct lws_context *context)
 {
        const struct lws_protocols *protocol = NULL;
        struct lws wsi;
-       int n;
+       int n, m = context->count_threads;
 
        lwsl_notice("%s\n", __func__);
 
@@ -306,14 +343,15 @@ lws_context_destroy(struct lws_context *context)
                lwsl_notice("Worst latency: %s\n", context->worst_latency_info);
 #endif
 
-       for (n = 0; n < context->fds_count; n++) {
-               struct lws *wsi = wsi_from_fd(context, context->fds[n].fd);
-               if (!wsi)
-                       continue;
-               lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY
+       while (m--)
+               for (n = 0; n < context->pt[m].fds_count; n++) {
+                       struct lws *wsi = wsi_from_fd(context, context->pt[m].fds[n].fd);
+                       if (!wsi)
+                               continue;
+                       lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY
                                /* no protocol close */);
-               n--;
-       }
+                       n--;
+               }
 
        /*
         * give all extensions a chance to clean up any per-context
@@ -345,10 +383,13 @@ lws_context_destroy(struct lws_context *context)
                ev_signal_stop(context->io_loop, &context->w_sigint.watcher);
 #endif /* LWS_USE_LIBEV */
 
+       for (n = 0; n < context->count_threads; n++)
+               lws_free_set_NULL(context->pt[n].serv_buf);
+
        lws_plat_context_early_destroy(context);
        lws_ssl_context_destroy(context);
-       if (context->fds)
-               lws_free(context->fds);
+       if (context->pt[0].fds)
+               lws_free(context->pt[0].fds);
        if (context->ah_pool)
                lws_free(context->ah_pool);
        if (context->http_header_data)
index 8f71a28..8c6b96e 100644 (file)
@@ -177,11 +177,10 @@ lws_return_http_status(struct lws *wsi, unsigned int code, const char *html_body
 {
        int n, m;
        struct lws_context *context = lws_get_context(wsi);
-       unsigned char *p = context->serv_buf +
-                          LWS_PRE;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+       unsigned char *p = pt->serv_buf + LWS_PRE;
        unsigned char *start = p;
-       unsigned char *end = p + sizeof(context->serv_buf) -
-                            LWS_PRE;
+       unsigned char *end = p + LWS_MAX_SOCKET_IO_BUF - LWS_PRE;
 
        if (!html_body)
                html_body = "";
index f8c60a3..ae58daa 100644 (file)
@@ -85,6 +85,7 @@ void
 lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
 {
        struct lws_context *context = wsi->context;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
        int n, m, ret, old_state;
        struct lws_tokens eff_buf;
 
@@ -261,7 +262,7 @@ just_kill_connection:
            wsi->mode == LWSCM_WS_CLIENT) {
 
                if (wsi->u.ws.rx_draining_ext) {
-                       struct lws **w = &wsi->context->rx_draining_ext_list;
+                       struct lws **w = &pt->rx_draining_ext_list;
 
                        wsi->u.ws.rx_draining_ext = 0;
                        /* remove us from context draining ext list */
@@ -276,7 +277,7 @@ just_kill_connection:
                }
 
                if (wsi->u.ws.tx_draining_ext) {
-                       struct lws **w = &wsi->context->tx_draining_ext_list;
+                       struct lws **w = &pt->tx_draining_ext_list;
 
                        wsi->u.ws.tx_draining_ext = 0;
                        /* remove us from context draining ext list */
@@ -546,15 +547,19 @@ LWS_VISIBLE int
 lws_callback_all_protocol(struct lws_context *context,
                          const struct lws_protocols *protocol, int reason)
 {
+       struct lws_context_per_thread *pt = &context->pt[0];
        struct lws *wsi;
-       int n;
-
-       for (n = 0; n < context->fds_count; n++) {
-               wsi = wsi_from_fd(context, context->fds[n].fd);
-               if (!wsi)
-                       continue;
-               if (wsi->protocol == protocol)
-                       protocol->callback(wsi, reason, wsi->user_space, NULL, 0);
+       int n, m = context->count_threads;
+
+       while (m--) {
+               for (n = 0; n < pt->fds_count; n++) {
+                       wsi = wsi_from_fd(context, pt->fds[n].fd);
+                       if (!wsi)
+                               continue;
+                       if (wsi->protocol == protocol)
+                               protocol->callback(wsi, reason, wsi->user_space, NULL, 0);
+               }
+               pt++;
        }
 
        return 0;
@@ -691,15 +696,19 @@ LWS_VISIBLE void
 lws_rx_flow_allow_all_protocol(const struct lws_context *context,
                               const struct lws_protocols *protocol)
 {
-       int n;
+       const struct lws_context_per_thread *pt = &context->pt[0];
        struct lws *wsi;
-
-       for (n = 0; n < context->fds_count; n++) {
-               wsi = wsi_from_fd(context, context->fds[n].fd);
-               if (!wsi)
-                       continue;
-               if (wsi->protocol == protocol)
-                       lws_rx_flow_control(wsi, LWS_RXFLOW_ALLOW);
+       int n, m = context->count_threads;
+
+       while (m--) {
+               for (n = 0; n < pt->fds_count; n++) {
+                       wsi = wsi_from_fd(context, pt->fds[n].fd);
+                       if (!wsi)
+                               continue;
+                       if (wsi->protocol == protocol)
+                               lws_rx_flow_control(wsi, LWS_RXFLOW_ALLOW);
+               }
+               pt++;
        }
 }
 
@@ -1013,6 +1022,12 @@ lws_get_context(const struct lws *wsi)
        return wsi->context;
 }
 
+LWS_VISIBLE LWS_EXTERN int
+lws_get_count_threads(struct lws_context *context)
+{
+       return context->count_threads;
+}
+
 LWS_VISIBLE LWS_EXTERN void *
 lws_wsi_user(struct lws *wsi)
 {
@@ -1024,8 +1039,7 @@ lws_close_reason(struct lws *wsi, enum lws_close_status status,
                 unsigned char *buf, size_t len)
 {
        unsigned char *p, *start;
-       int budget = sizeof(wsi->u.ws.ping_payload_buf) -
-                    LWS_PRE;
+       int budget = sizeof(wsi->u.ws.ping_payload_buf) - LWS_PRE;
 
        assert(wsi->mode == LWSCM_WS_SERVING || wsi->mode == LWSCM_WS_CLIENT);
 
@@ -1046,7 +1060,7 @@ _lws_rx_flow_control(struct lws *wsi)
 {
        /* there is no pending change */
        if (!(wsi->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE)) {
-               lwsl_info("%s: no pending change\n", __func__);
+               lwsl_debug("%s: no pending change\n", __func__);
                return 0;
        }
 
@@ -1147,7 +1161,8 @@ lws_check_utf8(unsigned char *state, unsigned char *buf, size_t len)
  */
 
 LWS_VISIBLE LWS_EXTERN int
-lws_parse_uri(char *p, const char **prot, const char **ads, int *port, const char **path)
+lws_parse_uri(char *p, const char **prot, const char **ads, int *port,
+             const char **path)
 {
        const char *end;
        static const char *slash = "/";
index 5c7db72..09a3164 100644 (file)
@@ -1298,6 +1298,11 @@ extern int lws_extension_callback_pm_deflate(
  *             allocated for the lifetime of the context).  If the pool is
  *             busy new incoming connections must wait for accept until one
  *             becomes free.
+ * @count_threads: how many contexts to create in an array, 0 = 1
+ * @fd_limit_per_thread: nonzero means restrict each service thread to this
+ *             many fds, 0 means the default which is divide the process fd
+ *             limit by the number of threads.
+ *
  */
 
 struct lws_context_creation_info {
@@ -1329,6 +1334,9 @@ struct lws_context_creation_info {
        short max_http_header_data;
        short max_http_header_pool;
 
+       unsigned int count_threads;
+       unsigned int fd_limit_per_thread;
+
        /* Add new things just above here ---^
         * This is part of the ABI, don't needlessly break compatibility
         *
@@ -1400,6 +1408,12 @@ lws_context_destroy(struct lws_context *context);
 LWS_VISIBLE LWS_EXTERN int
 lws_service(struct lws_context *context, int timeout_ms);
 
+LWS_VISIBLE LWS_EXTERN int
+lws_service_tsi(struct lws_context *context, int timeout_ms, int tsi);
+
+LWS_VISIBLE LWS_EXTERN void
+lws_cancel_service_pt(struct lws *wsi);
+
 LWS_VISIBLE LWS_EXTERN void
 lws_cancel_service(struct lws_context *context);
 
@@ -1429,7 +1443,7 @@ lws_add_http_header_status(struct lws *wsi,
                           unsigned int code, unsigned char **p,
                           unsigned char *end);
 
-LWS_EXTERN int
+LWS_VISIBLE LWS_EXTERN int
 lws_http_transaction_completed(struct lws *wsi);
 
 #ifdef LWS_USE_LIBEV
@@ -1449,6 +1463,10 @@ lws_sigint_cb(struct ev_loop *loop, struct ev_signal *watcher, int revents);
 LWS_VISIBLE LWS_EXTERN int
 lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd);
 
+LWS_VISIBLE LWS_EXTERN int
+lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd,
+                  int tsi);
+
 LWS_VISIBLE LWS_EXTERN void *
 lws_context_user(struct lws_context *context);
 
@@ -1742,6 +1760,9 @@ lws_get_fops(struct lws_context *context);
 LWS_VISIBLE LWS_EXTERN struct lws_context *
 lws_get_context(const struct lws *wsi);
 
+LWS_VISIBLE LWS_EXTERN int
+lws_get_count_threads(struct lws_context *context);
+
 /*
  * Wsi-associated File Operations access helpers
  *
index a0e4f86..98276db 100644 (file)
@@ -59,21 +59,43 @@ static void lws_sigusr2(int sig)
 }
 
 /**
- * lws_cancel_service() - Cancel servicing of pending websocket activity
- * @context:   Websocket context
+ * lws_cancel_service_pt() - Cancel servicing of pending socket activity
+ *                             on one thread
+ * @wsi:       Cancel service on the thread this wsi is serviced by
  *
  *     This function let a call to lws_service() waiting for a timeout
  *     immediately return.
  */
 LWS_VISIBLE void
-lws_cancel_service(struct lws_context *context)
+lws_cancel_service_pt(struct lws *wsi)
 {
+       struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
        char buf = 0;
 
-       if (write(context->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1)
+       if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1)
                lwsl_err("Cannot write to dummy pipe");
 }
 
+/**
+ * lws_cancel_service() - Cancel ALL servicing of pending socket activity
+ * @context:   Websocket context
+ *
+ *     This function let a call to lws_service() waiting for a timeout
+ *     immediately return.
+ */
+LWS_VISIBLE void
+lws_cancel_service(struct lws_context *context)
+{
+       struct lws_context_per_thread *pt = &context->pt[0];
+       char buf = 0, m = context->count_threads;
+
+       while (m--) {
+               if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1)
+                       lwsl_err("Cannot write to dummy pipe");
+               pt++;
+       }
+}
+
 LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
 {
        int syslog_level = LOG_DEBUG;
@@ -96,12 +118,12 @@ LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
 }
 
 LWS_VISIBLE int
-lws_plat_service(struct lws_context *context, int timeout_ms)
+lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
 {
-       int n;
-       int m;
-       char buf;
+       struct lws_context_per_thread *pt = &context->pt[tsi];
        struct lws *wsi;
+       int n, m;
+       char buf;
 #ifdef LWS_OPENSSL_SUPPORT
        struct lws *wsi_next;
 #endif
@@ -125,26 +147,26 @@ lws_plat_service(struct lws_context *context, int timeout_ms)
        context->service_tid = context->service_tid_detected;
 
        /* if we know we are draining rx ext, do not wait in poll */
-       if (context->rx_draining_ext_list)
+       if (pt->rx_draining_ext_list)
                timeout_ms = 0;
 
 #ifdef LWS_OPENSSL_SUPPORT
        /* if we know we have non-network pending data, do not wait in poll */
-       if (lws_ssl_anybody_has_buffered_read(context)) {
+       if (lws_ssl_anybody_has_buffered_read_tsi(context, tsi)) {
                timeout_ms = 0;
                lwsl_err("ssl buffered read\n");
        }
 #endif
 
-       n = poll(context->fds, context->fds_count, timeout_ms);
+       n = poll(pt->fds, pt->fds_count, timeout_ms);
 
 #ifdef LWS_OPENSSL_SUPPORT
-       if (!context->rx_draining_ext_list &&
-           !lws_ssl_anybody_has_buffered_read(context) && n == 0) {
+       if (!pt->rx_draining_ext_list &&
+           !lws_ssl_anybody_has_buffered_read_tsi(context, tsi) && !n) {
 #else
-       if (!context->rx_draining_ext_list && n == 0) /* poll timeout */ {
+       if (!pt->rx_draining_ext_list && !n) /* poll timeout */ {
 #endif
-               lws_service_fd(context, NULL);
+               lws_service_fd_tsi(context, NULL, tsi);
                return 0;
        }
 
@@ -158,10 +180,10 @@ lws_plat_service(struct lws_context *context, int timeout_ms)
         * For all guys with already-available ext data to drain, if they are
         * not flowcontrolled, fake their POLLIN status
         */
-       wsi = context->rx_draining_ext_list;
+       wsi = pt->rx_draining_ext_list;
        while (wsi) {
-               context->fds[wsi->position_in_fds_table].revents |=
-                       context->fds[wsi->position_in_fds_table].events & POLLIN;
+               pt->fds[wsi->position_in_fds_table].revents |=
+                       pt->fds[wsi->position_in_fds_table].events & POLLIN;
                wsi = wsi->u.ws.rx_draining_ext_list;
        }
 
@@ -173,12 +195,12 @@ lws_plat_service(struct lws_context *context, int timeout_ms)
         * network socket may have nothing
         */
 
-       wsi = context->pending_read_list;
+       wsi = pt->pending_read_list;
        while (wsi) {
                wsi_next = wsi->pending_read_list_next;
-               context->fds[wsi->position_in_fds_table].revents |=
-                       context->fds[wsi->position_in_fds_table].events & POLLIN;
-               if (context->fds[wsi->position_in_fds_table].revents & POLLIN)
+               pt->fds[wsi->position_in_fds_table].revents |=
+                       pt->fds[wsi->position_in_fds_table].events & POLLIN;
+               if (pt->fds[wsi->position_in_fds_table].revents & POLLIN)
                        /*
                         * he's going to get serviced now, take him off the
                         * list of guys with buffered SSL.  If he still has some
@@ -193,17 +215,17 @@ lws_plat_service(struct lws_context *context, int timeout_ms)
 
        /* any socket with events to service? */
 
-       for (n = 0; n < context->fds_count; n++) {
-               if (!context->fds[n].revents)
+       for (n = 0; n < pt->fds_count; n++) {
+               if (!pt->fds[n].revents)
                        continue;
 
-               if (context->fds[n].fd == context->dummy_pipe_fds[0]) {
-                       if (read(context->fds[n].fd, &buf, 1) != 1)
+               if (pt->fds[n].fd == pt->dummy_pipe_fds[0]) {
+                       if (read(pt->fds[n].fd, &buf, 1) != 1)
                                lwsl_err("Cannot read from dummy pipe.");
                        continue;
                }
 
-               m = lws_service_fd(context, &context->fds[n]);
+               m = lws_service_fd_tsi(context, &pt->fds[n], tsi);
                if (m < 0)
                        return -1;
                /* if something closed, retry this slot */
@@ -215,6 +237,12 @@ lws_plat_service(struct lws_context *context, int timeout_ms)
 }
 
 LWS_VISIBLE int
+lws_plat_service(struct lws_context *context, int timeout_ms)
+{
+       return lws_plat_service_tsi(context, timeout_ms, 0);
+}
+
+LWS_VISIBLE int
 lws_plat_set_socket_options(struct lws_context *context, int fd)
 {
        int optval = 1;
@@ -332,11 +360,17 @@ lws_plat_context_early_destroy(struct lws_context *context)
 LWS_VISIBLE void
 lws_plat_context_late_destroy(struct lws_context *context)
 {
+       struct lws_context_per_thread *pt = &context->pt[0];
+       int m = context->count_threads;
+
        if (context->lws_lookup)
                lws_free(context->lws_lookup);
 
-       close(context->dummy_pipe_fds[0]);
-       close(context->dummy_pipe_fds[1]);
+       while (m--) {
+               close(pt->dummy_pipe_fds[0]);
+               close(pt->dummy_pipe_fds[1]);
+               pt++;
+       }
        close(context->fd_random);
 }
 
@@ -411,11 +445,12 @@ lws_interface_to_sa(int ipv6, const char *ifname, struct sockaddr_in *addr, size
 }
 
 LWS_VISIBLE void
-lws_plat_insert_socket_into_fds(struct lws_context *context,
-                                                      struct lws *wsi)
+lws_plat_insert_socket_into_fds(struct lws_context *context, struct lws *wsi)
 {
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+
        lws_libev_io(wsi, LWS_EV_START | LWS_EV_READ);
-       context->fds[context->fds_count++].revents = 0;
+       pt->fds[pt->fds_count++].revents = 0;
 }
 
 LWS_VISIBLE void
@@ -514,39 +549,47 @@ LWS_VISIBLE int
 lws_plat_init(struct lws_context *context,
              struct lws_context_creation_info *info)
 {
-       context->lws_lookup = lws_zalloc(sizeof(struct lws *) * context->max_fds);
+       struct lws_context_per_thread *pt = &context->pt[0];
+       int n = context->count_threads, fd;
+
+       /* master context has the global fd lookup array */
+       context->lws_lookup = lws_zalloc(sizeof(struct lws *) *
+                                        context->max_fds);
        if (context->lws_lookup == NULL) {
-               lwsl_err(
-                 "Unable to allocate lws_lookup array for %d connections\n",
-                                                             context->max_fds);
+               lwsl_err("OOM on lws_lookup array for %d connections\n",
+                        context->max_fds);
                return 1;
        }
 
        lwsl_notice(" mem: platform fd map: %5u bytes\n",
                    sizeof(struct lws *) * context->max_fds);
+       fd = open(SYSTEM_RANDOM_FILEPATH, O_RDONLY);
 
-       context->fd_random = open(SYSTEM_RANDOM_FILEPATH, O_RDONLY);
+       context->fd_random = fd;
        if (context->fd_random < 0) {
                lwsl_err("Unable to open random device %s %d\n",
-                                   SYSTEM_RANDOM_FILEPATH, context->fd_random);
+                        SYSTEM_RANDOM_FILEPATH, context->fd_random);
                return 1;
        }
 
        if (!lws_libev_init_fd_table(context)) {
                /* otherwise libev handled it instead */
 
-               if (pipe(context->dummy_pipe_fds)) {
-                       lwsl_err("Unable to create pipe\n");
-                       return 1;
+               while (n--) {
+                       if (pipe(pt->dummy_pipe_fds)) {
+                               lwsl_err("Unable to create pipe\n");
+                               return 1;
+                       }
+
+                       /* use the read end of pipe as first item */
+                       pt->fds[0].fd = pt->dummy_pipe_fds[0];
+                       pt->fds[0].events = LWS_POLLIN;
+                       pt->fds[0].revents = 0;
+                       pt->fds_count = 1;
+                       pt++;
                }
        }
 
-       /* use the read end of pipe as first item */
-       context->fds[0].fd = context->dummy_pipe_fds[0];
-       context->fds[0].events = LWS_POLLIN;
-       context->fds[0].revents = 0;
-       context->fds_count = 1;
-
        context->fops.open      = _lws_plat_file_open;
        context->fops.close     = _lws_plat_file_close;
        context->fops.seek_cur  = _lws_plat_file_seek_cur;
index 7e79310..36df16d 100644 (file)
@@ -1,3 +1,4 @@
+#define _WINSOCK_DEPRECATED_NO_WARNINGS
 #include "private-libwebsockets.h"
 
 unsigned long long
@@ -126,7 +127,20 @@ LWS_VISIBLE int lws_poll_listen_fd(struct lws_pollfd *fd)
 LWS_VISIBLE void
 lws_cancel_service(struct lws_context *context)
 {
-       WSASetEvent(context->events[0]);
+       struct lws_context_per_thread *pt = &context->pt[0];
+       int n = context->count_threads;
+
+       while (n--) {
+               WSASetEvent(pt->events[0]);
+               pt++;
+       }
+}
+
+LWS_VISIBLE void
+lws_cancel_service_pt(struct lws *wsi)
+{
+       struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
+       WSASetEvent(pt->events[0]);
 }
 
 LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
@@ -135,7 +149,7 @@ LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
 }
 
 LWS_VISIBLE int
-lws_plat_service(struct lws_context *context, int timeout_ms)
+lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
 {
        int n;
        int i;
@@ -143,6 +157,7 @@ lws_plat_service(struct lws_context *context, int timeout_ms)
        WSANETWORKEVENTS networkevents;
        struct lws_pollfd *pfd;
        struct lws *wsi;
+       struct lws_context_per_thread *pt = &context->pt[tsi];
 
        /* stay dead once we are dead */
 
@@ -160,8 +175,8 @@ lws_plat_service(struct lws_context *context, int timeout_ms)
        }
        context->service_tid = context->service_tid_detected;
 
-       for (i = 0; i < context->fds_count; ++i) {
-               pfd = &context->fds[i];
+       for (i = 0; i < pt->fds_count; ++i) {
+               pfd = &pt->fds[i];
                if (pfd->fd == context->lserv_fd)
                        continue;
 
@@ -179,7 +194,7 @@ lws_plat_service(struct lws_context *context, int timeout_ms)
                }
        }
 
-       ev = WSAWaitForMultipleEvents(context->fds_count + 1, context->events,
+       ev = WSAWaitForMultipleEvents(pt->fds_count + 1, pt->events,
                                      FALSE, timeout_ms, FALSE);
        context->service_tid = 0;
 
@@ -189,17 +204,17 @@ lws_plat_service(struct lws_context *context, int timeout_ms)
        }
 
        if (ev == WSA_WAIT_EVENT_0) {
-               WSAResetEvent(context->events[0]);
+               WSAResetEvent(pt->events[0]);
                return 0;
        }
 
-       if (ev < WSA_WAIT_EVENT_0 || ev > WSA_WAIT_EVENT_0 + context->fds_count)
+       if (ev < WSA_WAIT_EVENT_0 || ev > WSA_WAIT_EVENT_0 + pt->fds_count)
                return -1;
 
-       pfd = &context->fds[ev - WSA_WAIT_EVENT_0 - 1];
+       pfd = &pt->fds[ev - WSA_WAIT_EVENT_0 - 1];
 
        if (WSAEnumNetworkEvents(pfd->fd,
-                                context->events[ev - WSA_WAIT_EVENT_0],
+                                pt->events[ev - WSA_WAIT_EVENT_0],
                                 &networkevents) == SOCKET_ERROR) {
                lwsl_err("WSAEnumNetworkEvents() failed with error %d\n",
                                                                     LWS_ERRNO);
@@ -218,6 +233,12 @@ lws_plat_service(struct lws_context *context, int timeout_ms)
 }
 
 LWS_VISIBLE int
+lws_plat_service(struct lws_context *context, int timeout_ms)
+{
+       return lws_plat_service_tsi(context, timeout_ms, 0);
+}
+
+LWS_VISIBLE int
 lws_plat_set_socket_options(struct lws_context *context, lws_sockfd_type fd)
 {
        int optval = 1;
@@ -289,9 +310,15 @@ lws_plat_context_early_init(void)
 LWS_VISIBLE void
 lws_plat_context_early_destroy(struct lws_context *context)
 {
-       if (context->events) {
-               WSACloseEvent(context->events[0]);
-               lws_free(context->events);
+       struct lws_context_per_thread *pt = &context->pt[0];
+       int n = context->count_threads;
+
+       while (n--) {
+               if (pt->events) {
+                       WSACloseEvent(pt->events[0]);
+                       lws_free(pt->events);
+               }
+               pt++;
        }
 }
 
@@ -329,20 +356,23 @@ lws_interface_to_sa(int ipv6,
 }
 
 LWS_VISIBLE void
-lws_plat_insert_socket_into_fds(struct lws_context *context,
-                                                      struct lws *wsi)
+lws_plat_insert_socket_into_fds(struct lws_context *context, struct lws *wsi)
 {
-       context->fds[context->fds_count++].revents = 0;
-       context->events[context->fds_count] = WSACreateEvent();
-       WSAEventSelect(wsi->sock, context->events[context->fds_count], LWS_POLLIN);
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+
+       pt->fds[pt->fds_count++].revents = 0;
+       pt->events[pt->fds_count] = WSACreateEvent();
+       WSAEventSelect(wsi->sock, pt->events[pt->fds_count], LWS_POLLIN);
 }
 
 LWS_VISIBLE void
 lws_plat_delete_socket_from_fds(struct lws_context *context,
                                                struct lws *wsi, int m)
 {
-       WSACloseEvent(context->events[m + 1]);
-       context->events[m + 1] = context->events[context->fds_count + 1];
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+
+       WSACloseEvent(pt->events[m + 1]);
+       pt->events[m + 1] = pt->events[pt->fds_count + 1];
 }
 
 LWS_VISIBLE void
@@ -354,6 +384,7 @@ LWS_VISIBLE int
 lws_plat_change_pollfd(struct lws_context *context,
                      struct lws *wsi, struct lws_pollfd *pfd)
 {
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
        long networkevents = LWS_POLLHUP;
 
        if ((pfd->events & LWS_POLLIN))
@@ -363,7 +394,7 @@ lws_plat_change_pollfd(struct lws_context *context,
                networkevents |= LWS_POLLOUT;
 
        if (WSAEventSelect(wsi->sock,
-                       context->events[wsi->position_in_fds_table + 1],
+                       pt->events[wsi->position_in_fds_table + 1],
                                               networkevents) != SOCKET_ERROR)
                return 0;
 
@@ -497,7 +528,8 @@ LWS_VISIBLE int
 lws_plat_init(struct lws_context *context,
              struct lws_context_creation_info *info)
 {
-       int i;
+       struct lws_context_per_thread *pt = &context->pt[0];
+       int i, n = context->count_threads;
 
        for (i = 0; i < FD_HASHTABLE_MODULUS; i++) {
                context->fd_hashtable[i].wsi =
@@ -507,15 +539,20 @@ lws_plat_init(struct lws_context *context,
                        return -1;
        }
 
-       context->events = lws_malloc(sizeof(WSAEVENT) * (context->max_fds + 1));
-       if (context->events == NULL) {
-               lwsl_err("Unable to allocate events array for %d connections\n",
-                       context->max_fds);
-               return 1;
-       }
+       while (n--) {
+               pt->events = lws_malloc(sizeof(WSAEVENT) *
+                                       (context->fd_limit_per_thread + 1));
+               if (pt->events == NULL) {
+                       lwsl_err("Unable to allocate events array for %d connections\n",
+                                       context->fd_limit_per_thread + 1);
+                       return 1;
+               }
 
-       context->fds_count = 0;
-       context->events[0] = WSACreateEvent();
+               pt->fds_count = 0;
+               pt->events[0] = WSACreateEvent();
+
+               pt++;
+       }
 
        context->fd_random = 0;
 
index 9ba6d53..74ff856 100644 (file)
@@ -233,6 +233,7 @@ handle_truncated_send:
 LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf, size_t len,
                          enum lws_write_protocol wp)
 {
+       struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
        int masked7 = (wsi->mode == LWSCM_WS_CLIENT);
        unsigned char is_masked_bit = 0;
        unsigned char *dropmask = NULL;
@@ -242,7 +243,7 @@ LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf, size_t len,
 
        if (wsi->state == LWSS_ESTABLISHED && wsi->u.ws.tx_draining_ext) {
                /* remove us from the list */
-               struct lws **w = &wsi->context->tx_draining_ext_list;
+               struct lws **w = &pt->tx_draining_ext_list;
                lwsl_debug("%s: TX EXT DRAINING: Remove from list\n", __func__);
                wsi->u.ws.tx_draining_ext = 0;
                /* remove us from context draining ext list */
@@ -311,9 +312,8 @@ LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf, size_t len,
                if (n && eff_buf.token_len) {
                        /* extension requires further draining */
                        wsi->u.ws.tx_draining_ext = 1;
-                       wsi->u.ws.tx_draining_ext_list =
-                                       wsi->context->tx_draining_ext_list;
-                       wsi->context->tx_draining_ext_list = wsi;
+                       wsi->u.ws.tx_draining_ext_list = pt->tx_draining_ext_list;
+                       pt->tx_draining_ext_list = wsi;
                        /* we must come back to do more */
                        lws_callback_on_writable(wsi);
                        /*
@@ -551,6 +551,7 @@ send_raw:
 LWS_VISIBLE int lws_serve_http_file_fragment(struct lws *wsi)
 {
        struct lws_context *context = wsi->context;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
        unsigned long amount;
        int n, m;
 
@@ -569,8 +570,8 @@ LWS_VISIBLE int lws_serve_http_file_fragment(struct lws *wsi)
                        goto all_sent;
 
                if (lws_plat_file_read(wsi, wsi->u.http.fd, &amount,
-                                      context->serv_buf,
-                                      sizeof(context->serv_buf)) < 0)
+                                      pt->serv_buf,
+                                      LWS_MAX_SOCKET_IO_BUF) < 0)
                        return -1; /* caller will close */
 
                n = (int)amount;
@@ -578,7 +579,7 @@ LWS_VISIBLE int lws_serve_http_file_fragment(struct lws *wsi)
                        lws_set_timeout(wsi, PENDING_TIMEOUT_HTTP_CONTENT,
                                        AWAITING_TIMEOUT);
                        wsi->u.http.filepos += n;
-                       m = lws_write(wsi, context->serv_buf, n,
+                       m = lws_write(wsi, pt->serv_buf, n,
                                      wsi->u.http.filepos == wsi->u.http.filelen ?
                                        LWS_WRITE_HTTP_FINAL : LWS_WRITE_HTTP);
                        if (m < 0)
index 93a1210..17059af 100644 (file)
@@ -737,6 +737,7 @@ LWS_VISIBLE int lws_frame_is_binary(struct lws *wsi)
 int
 lws_rx_sm(struct lws *wsi, unsigned char c)
 {
+       struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
        struct lws_tokens eff_buf;
        int ret = 0, n, rx_draining_ext = 0;
        int callback_action = LWS_CALLBACK_RECEIVE;
@@ -746,7 +747,7 @@ lws_rx_sm(struct lws *wsi, unsigned char c)
        switch (wsi->lws_rx_parse_state) {
        case LWS_RXPS_NEW:
                if (wsi->u.ws.rx_draining_ext) {
-                       struct lws **w = &wsi->context->rx_draining_ext_list;
+                       struct lws **w = &pt->rx_draining_ext_list;
 
                        eff_buf.token = NULL;
                        eff_buf.token_len = 0;
@@ -1016,13 +1017,14 @@ handle_first:
 
                assert(wsi->u.ws.rx_ubuf);
 
-               if (wsi->u.ws.rx_ubuf_head + LWS_PRE + 4 >= wsi->u.ws.rx_ubuf_alloc) {
-                       lwsl_err("Attempted overflow\n");
-                       return -1;
-               }
+               if (wsi->u.ws.rx_ubuf_head + LWS_PRE >=
+                   wsi->u.ws.rx_ubuf_alloc) {
+                       lwsl_err("Attempted overflow \n");
+                       return -1;
+               }
                if (wsi->u.ws.all_zero_nonce)
                        wsi->u.ws.rx_ubuf[LWS_PRE +
-                              (wsi->u.ws.rx_ubuf_head++)] = c;
+                                        (wsi->u.ws.rx_ubuf_head++)] = c;
                else
                        wsi->u.ws.rx_ubuf[LWS_PRE +
                               (wsi->u.ws.rx_ubuf_head++)] =
@@ -1079,8 +1081,7 @@ spill:
                                        wsi->protocol->callback, wsi,
                                        LWS_CALLBACK_WS_PEER_INITIATED_CLOSE,
                                        wsi->user_space,
-                                       &wsi->u.ws.rx_ubuf[
-                                               LWS_PRE],
+                                       &wsi->u.ws.rx_ubuf[LWS_PRE],
                                        wsi->u.ws.rx_ubuf_head))
                                return -1;
 
@@ -1139,7 +1140,7 @@ ping_drop:
 
                default:
                        lwsl_parser("passing opc %x up to exts\n",
-                                                       wsi->u.ws.opcode);
+                                   wsi->u.ws.opcode);
                        /*
                         * It's something special we can't understand here.
                         * Pass the payload up to the extension's parsing
@@ -1189,8 +1190,8 @@ drain_extension:
                if (n && eff_buf.token_len) {
                        /* extension had more... main loop will come back */
                        wsi->u.ws.rx_draining_ext = 1;
-                       wsi->u.ws.rx_draining_ext_list = wsi->context->rx_draining_ext_list;
-                       wsi->context->rx_draining_ext_list = wsi;
+                       wsi->u.ws.rx_draining_ext_list = pt->rx_draining_ext_list;
+                       pt->rx_draining_ext_list = wsi;
                }
 
                if (eff_buf.token_len > 0 ||
index c41a3b8..8ab3720 100644 (file)
@@ -25,8 +25,9 @@ int
 insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
 {
        struct lws_pollargs pa = { wsi->sock, LWS_POLLIN, 0 };
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
 
-       if (context->fds_count >= context->max_fds) {
+       if ((unsigned int)pt->fds_count >= context->fd_limit_per_thread) {
                lwsl_err("Too many fds (%d)\n", context->max_fds);
                return 1;
        }
@@ -47,9 +48,9 @@ insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
                return -1;
 
        insert_wsi(context, wsi);
-       wsi->position_in_fds_table = context->fds_count;
-       context->fds[context->fds_count].fd = wsi->sock;
-       context->fds[context->fds_count].events = LWS_POLLIN;
+       wsi->position_in_fds_table = pt->fds_count;
+       pt->fds[pt->fds_count].fd = wsi->sock;
+       pt->fds[pt->fds_count].events = LWS_POLLIN;
 
        lws_plat_insert_socket_into_fds(context, wsi);
 
@@ -72,10 +73,11 @@ remove_wsi_socket_from_fds(struct lws *wsi)
        struct lws *end_wsi;
        struct lws_pollargs pa = { wsi->sock, 0, 0 };
        struct lws_context *context = wsi->context;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
 
        lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE);
 
-       --context->fds_count;
+       --pt->fds_count;
 
 #if !defined(_WIN32) && !defined(MBED_OPERATORS)
        if (wsi->sock > context->max_fds) {
@@ -95,7 +97,7 @@ remove_wsi_socket_from_fds(struct lws *wsi)
        m = wsi->position_in_fds_table; /* replace the contents for this */
 
        /* have the last guy take up the vacant slot */
-       context->fds[m] = context->fds[context->fds_count];
+       pt->fds[m] = pt->fds[pt->fds_count];
 
        lws_plat_delete_socket_from_fds(context, wsi, m);
 
@@ -104,7 +106,7 @@ remove_wsi_socket_from_fds(struct lws *wsi)
         * (still same fd pointing to same wsi)
         */
        /* end guy's "position in fds table" changed */
-       end_wsi = wsi_from_fd(context, context->fds[context->fds_count].fd);
+       end_wsi = wsi_from_fd(context, pt->fds[pt->fds_count].fd);
        end_wsi->position_in_fds_table = m;
        /* deletion guy's lws_lookup entry needs nuking */
        delete_from_fd(context, wsi->sock);
@@ -128,6 +130,7 @@ int
 lws_change_pollfd(struct lws *wsi, int _and, int _or)
 {
        struct lws_context *context;
+       struct lws_context_per_thread *pt;
        int tid;
        int sampled_tid;
        struct lws_pollfd *pfd;
@@ -141,7 +144,9 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or)
        if (!context)
                return 1;
 
-       pfd = &context->fds[wsi->position_in_fds_table];
+       pt = &context->pt[(int)wsi->tsi];
+
+       pfd = &pt->fds[wsi->position_in_fds_table];
        pa.fd = wsi->sock;
 
        if (context->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
@@ -179,7 +184,7 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or)
                        if (tid == -1)
                                return -1;
                        if (tid != sampled_tid)
-                               lws_cancel_service(context);
+                               lws_cancel_service_pt(wsi);
                }
        }
 
@@ -250,8 +255,7 @@ lws_callback_on_writable(struct lws *wsi)
 network_sock:
 #endif
 
-       if (lws_ext_cb_active(wsi,
-                               LWS_EXT_CB_REQUEST_ON_WRITEABLE, NULL, 0))
+       if (lws_ext_cb_active(wsi, LWS_EXT_CB_REQUEST_ON_WRITEABLE, NULL, 0))
                return 1;
 
        if (wsi->position_in_fds_table < 0) {
@@ -281,15 +285,19 @@ LWS_VISIBLE int
 lws_callback_on_writable_all_protocol(const struct lws_context *context,
                                      const struct lws_protocols *protocol)
 {
+       const struct lws_context_per_thread *pt = &context->pt[0];
        struct lws *wsi;
-       int n;
-
-       for (n = 0; n < context->fds_count; n++) {
-               wsi = wsi_from_fd(context,context->fds[n].fd);
-               if (!wsi)
-                       continue;
-               if (wsi->protocol == protocol)
-                       lws_callback_on_writable(wsi);
+       int n, m = context->count_threads;
+
+       while (m--) {
+               for (n = 0; n < pt->fds_count; n++) {
+                       wsi = wsi_from_fd(context, pt->fds[n].fd);
+                       if (!wsi)
+                               continue;
+                       if (wsi->protocol == protocol)
+                               lws_callback_on_writable(wsi);
+               }
+               pt++;
        }
 
        return 0;
index 7a16cb5..44295b7 100644 (file)
@@ -314,9 +314,6 @@ extern "C" {
 #ifndef SYSTEM_RANDOM_FILEPATH
 #define SYSTEM_RANDOM_FILEPATH "/dev/urandom"
 #endif
-#ifndef LWS_MAX_ZLIB_CONN_BUFFER
-#define LWS_MAX_ZLIB_CONN_BUFFER (64 * 1024)
-#endif
 
 /*
  * if not in a connection storm, check for incoming
@@ -500,13 +497,45 @@ struct allocated_headers {
        unsigned char nfrag;
 };
 
-struct lws_context {
-       time_t last_timeout_check_s;
-       struct lws_plat_file_ops fops;
+/*
+ * so we can have n connections being serviced simultaneously,
+ * these things need to be isolated per-thread.
+ */
+
+struct lws_context_per_thread {
+       struct lws_pollfd *fds;
+       struct lws *rx_draining_ext_list;
+       struct lws *tx_draining_ext_list;
+#ifdef LWS_OPENSSL_SUPPORT
+       struct lws *pending_read_list; /* linked list */
+#endif
+       /*
+        * usable by anything in the service code, but only if the scope
+        * does not last longer than the service action (since next service
+        * of any socket can likewise use it and overwrite)
+        */
+       unsigned char *serv_buf;
 #ifdef _WIN32
        WSAEVENT *events;
+#else
+       int dummy_pipe_fds[2];
 #endif
-       struct lws_pollfd *fds;
+       int fds_count;
+};
+
+/*
+ * the rest is managed per-context, that includes
+ *
+ *  - processwide single fd -> wsi lookup
+ *  - contextwide headers pool
+ *  - contextwide ssl context
+ *  - contextwide proxy
+ */
+
+struct lws_context {
+       time_t last_timeout_check_s;
+       struct lws_plat_file_ops fops;
+       struct lws_context_per_thread pt[LWS_MAX_SMP];
 #ifdef _WIN32
 /* different implementation between unix and windows */
        struct lws_fd_hashtable fd_hashtable[FD_HASHTABLE_MODULUS];
@@ -530,23 +559,15 @@ struct lws_context {
        const struct lws_protocols *protocols;
        void *http_header_data;
        struct allocated_headers *ah_pool;
-       struct lws *rx_draining_ext_list;
-       struct lws *tx_draining_ext_list;
+
 #ifdef LWS_OPENSSL_SUPPORT
        SSL_CTX *ssl_ctx;
        SSL_CTX *ssl_client_ctx;
-       struct lws *pending_read_list; /* linked list */
 #endif
 #ifndef LWS_NO_EXTENSIONS
        const struct lws_extension *extensions;
 #endif
 
-       /*
-        * usable by anything in the service code, but only if the scope
-        * does not last longer than the service action (since next service
-        * of any socket can likewise use it and overwrite)
-        */
-       unsigned char serv_buf[LWS_MAX_SOCKET_IO_BUF];
        char http_proxy_address[128];
        char proxy_basic_auth_token[128];
        char canonical_hostname[128];
@@ -557,7 +578,6 @@ struct lws_context {
 
        lws_sockfd_type lserv_fd;
 
-       int fds_count;
        int max_fds;
        int listen_port;
 #ifdef LWS_USE_LIBEV
@@ -571,6 +591,7 @@ struct lws_context {
        int lserv_seen;
        unsigned int http_proxy_port;
        unsigned int options;
+       unsigned int fd_limit_per_thread;
 
        /*
         * set to the Thread ID that's doing the service loop just before entry
@@ -580,9 +601,6 @@ struct lws_context {
         */
        volatile int service_tid;
        int service_tid_detected;
-#ifndef _WIN32
-       int dummy_pipe_fds[2];
-#endif
 
        int count_protocols;
        int ka_time;
@@ -593,15 +611,21 @@ struct lws_context {
        int use_ssl;
        int allow_non_ssl_on_ssl_port;
        unsigned int user_supplied_ssl_ctx:1;
-#define lws_ssl_anybody_has_buffered_read(ctx) \
-               (ctx->use_ssl && ctx->pending_read_list)
+#define lws_ssl_anybody_has_buffered_read(w) \
+               (w->context->use_ssl && \
+                w->context->pt[(int)w->tsi].pending_read_list)
+#define lws_ssl_anybody_has_buffered_read_tsi(c, t) \
+               (c->use_ssl && \
+                c->pt[(int)t].pending_read_list)
 #else
 #define lws_ssl_anybody_has_buffered_read(ctx) (0)
+#define lws_ssl_anybody_has_buffered_read_tsi(ctx, t) (0)
 #endif
 
        short max_http_header_data;
        short max_http_header_pool;
        short ah_count_in_use;
+       short count_threads;
 
        unsigned int being_destroyed:1;
 };
@@ -966,6 +990,7 @@ struct lws {
        char rx_frame_type; /* enum lws_write_protocol */
        char pending_timeout; /* enum pending_timeout */
        char pps; /* enum lws_pending_protocol_send */
+       char tsi; /* thread service index we belong to */
 };
 
 LWS_EXTERN int log_level;
@@ -1329,6 +1354,8 @@ LWS_EXTERN int
 lws_poll_listen_fd(struct lws_pollfd *fd);
 LWS_EXTERN int
 lws_plat_service(struct lws_context *context, int timeout_ms);
+LWS_EXTERN LWS_VISIBLE int
+lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi);
 LWS_EXTERN int
 lws_plat_init(struct lws_context *context,
              struct lws_context_creation_info *info);
index 7b6fc8c..93cba5a 100644 (file)
@@ -27,6 +27,7 @@ LWS_VISIBLE int
 lws_extension_server_handshake(struct lws *wsi, char **p)
 {
        struct lws_context *context = wsi->context;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
        const struct lws_extension *ext;
        char ext_name[128];
        int ext_count = 0;
@@ -47,11 +48,11 @@ lws_extension_server_handshake(struct lws *wsi, char **p)
         * and go through them
         */
 
-       if (lws_hdr_copy(wsi, (char *)context->serv_buf,
-                        sizeof(context->serv_buf), WSI_TOKEN_EXTENSIONS) < 0)
+       if (lws_hdr_copy(wsi, (char *)pt->serv_buf, LWS_MAX_SOCKET_IO_BUF,
+                        WSI_TOKEN_EXTENSIONS) < 0)
                return 1;
 
-       c = (char *)context->serv_buf;
+       c = (char *)pt->serv_buf;
        lwsl_parser("WSI_TOKEN_EXTENSIONS = '%s'\n", c);
        wsi->count_act_ext = 0;
        n = 0;
@@ -157,6 +158,7 @@ lws_extension_server_handshake(struct lws *wsi, char **p)
 int
 handshake_0405(struct lws_context *context, struct lws *wsi)
 {
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
        unsigned char hash[20];
        int n;
        char *response;
@@ -180,15 +182,14 @@ handshake_0405(struct lws_context *context, struct lws *wsi)
         * since key length is restricted above (currently 128), cannot
         * overflow
         */
-       n = sprintf((char *)context->serv_buf,
+       n = sprintf((char *)pt->serv_buf,
                                "%s258EAFA5-E914-47DA-95CA-C5AB0DC85B11",
                                lws_hdr_simple_ptr(wsi, WSI_TOKEN_KEY));
 
-       lws_SHA1(context->serv_buf, n, hash);
+       lws_SHA1(pt->serv_buf, n, hash);
 
        accept_len = lws_b64_encode_string((char *)hash, 20,
-                       (char *)context->serv_buf,
-                       sizeof(context->serv_buf));
+                       (char *)pt->serv_buf, LWS_MAX_SOCKET_IO_BUF);
        if (accept_len < 0) {
                lwsl_warn("Base64 encoded hash too long\n");
                goto bail;
@@ -202,13 +203,13 @@ handshake_0405(struct lws_context *context, struct lws *wsi)
 
        /* make a buffer big enough for everything */
 
-       response = (char *)context->serv_buf + MAX_WEBSOCKET_04_KEY_LEN + LWS_PRE;
+       response = (char *)pt->serv_buf + MAX_WEBSOCKET_04_KEY_LEN + LWS_PRE;
        p = response;
        LWS_CPYAPP(p, "HTTP/1.1 101 Switching Protocols\x0d\x0a"
                      "Upgrade: WebSocket\x0d\x0a"
                      "Connection: Upgrade\x0d\x0a"
                      "Sec-WebSocket-Accept: ");
-       strcpy(p, (char *)context->serv_buf);
+       strcpy(p, (char *)pt->serv_buf);
        p += accept_len;
 
        if (lws_hdr_total_length(wsi, WSI_TOKEN_PROTOCOL)) {
index c92401d..3be716d 100644 (file)
@@ -655,11 +655,27 @@ int lws_http_transaction_completed(struct lws *wsi)
        return 0;
 }
 
+static int
+lws_get_idlest_tsi(struct lws_context *context)
+{
+       unsigned int lowest = ~0;
+       int n, hit = 0;
+
+       for (n = 0; n < context->count_threads; n++)
+               if ((unsigned int)context->pt[n].fds_count < lowest) {
+                       lowest = context->pt[n].fds_count;
+                       hit = n;
+               }
+
+       return hit;
+}
+
 LWS_VISIBLE
 int lws_server_socket_service(struct lws_context *context,
                              struct lws *wsi, struct lws_pollfd *pollfd)
 {
        lws_sockfd_type accept_fd = LWS_SOCK_INVALID;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
 #if LWS_POSIX
        struct sockaddr_in cli_addr;
        socklen_t clilen;
@@ -697,8 +713,8 @@ int lws_server_socket_service(struct lws_context *context,
                if (!(pollfd->revents & pollfd->events && LWS_POLLIN))
                        goto try_pollout;
 
-               len = lws_ssl_capable_read(wsi, context->serv_buf,
-                                          sizeof(context->serv_buf));
+               len = lws_ssl_capable_read(wsi, pt->serv_buf,
+                                          LWS_MAX_SOCKET_IO_BUF);
                lwsl_debug("%s: read %d\r\n", __func__, len);
                switch (len) {
                case 0:
@@ -719,7 +735,7 @@ int lws_server_socket_service(struct lws_context *context,
                         * hm this may want to send
                         * (via HTTP callback for example)
                         */
-                       n = lws_read(wsi, context->serv_buf, len);
+                       n = lws_read(wsi, pt->serv_buf, len);
                        if (n < 0) /* we closed wsi */
                                return 1;
 
@@ -809,6 +825,8 @@ try_pollout:
                }
 
                new_wsi->sock = accept_fd;
+               new_wsi->tsi = lws_get_idlest_tsi(context);
+               lwsl_info("Accepted to tsi %d\n", new_wsi->tsi);
 
                /* the transport is accepted... give him time to negotiate */
                lws_set_timeout(new_wsi, PENDING_TIMEOUT_ESTABLISH_WITH_SERVER,
@@ -877,9 +895,10 @@ LWS_VISIBLE int lws_serve_http_file(struct lws *wsi, const char *file,
                                    int other_headers_len)
 {
        struct lws_context *context = lws_get_context(wsi);
-       unsigned char *response = context->serv_buf + LWS_PRE;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+       unsigned char *response = pt->serv_buf + LWS_PRE;
        unsigned char *p = response;
-       unsigned char *end = p + sizeof(context->serv_buf) - LWS_PRE;
+       unsigned char *end = p + LWS_MAX_SOCKET_IO_BUF - LWS_PRE;
        int ret = 0;
 
        wsi->u.http.fd = lws_plat_file_open(wsi, file, &wsi->u.http.filelen,
index 9ddbeae..af72dc5 100644 (file)
@@ -370,11 +370,12 @@ int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len)
  */
 
 LWS_VISIBLE int
-lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd)
+lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int tsi)
 {
 #if LWS_POSIX
        int idx = 0;
 #endif
+       struct lws_context_per_thread *pt = &context->pt[tsi];
        lws_sockfd_type our_fd = 0;
        struct lws_tokens eff_buf;
        unsigned int pending = 0;
@@ -449,7 +450,7 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd)
         * pending connection here, it causes us to check again next time.
         */
 
-       if (context->lserv_fd && pollfd != &context->fds[idx]) {
+       if (context->lserv_fd && pollfd != &pt->fds[idx]) {
                context->lserv_count++;
                if (context->lserv_seen ||
                    context->lserv_count == context->lserv_mod) {
@@ -462,14 +463,14 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd)
                                 * even with extpoll, we prepared this
                                 * internal fds for listen
                                 */
-                               n = lws_poll_listen_fd(&context->fds[idx]);
+                               n = lws_poll_listen_fd(&pt->fds[idx]);
                                if (n <= 0) {
                                        if (context->lserv_seen)
                                                context->lserv_seen--;
                                        break;
                                }
                                /* there's a conn waiting for us */
-                               lws_service_fd(context, &context->fds[idx]);
+                               lws_service_fd(context, &pt->fds[idx]);
                                context->lserv_seen++;
                        }
                }
@@ -605,9 +606,8 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd)
                if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
                        break;
 read:
-               eff_buf.token_len = lws_ssl_capable_read(wsi, context->serv_buf,
-                                       pending ? pending :
-                                       sizeof(context->serv_buf));
+               eff_buf.token_len = lws_ssl_capable_read(wsi, pt->serv_buf,
+                                       pending ? pending : LWS_MAX_SOCKET_IO_BUF);
                switch (eff_buf.token_len) {
                case 0:
                        lwsl_info("service_fd: closing due to 0 length read\n");
@@ -633,7 +633,7 @@ read:
                 * used then so it is efficient.
                 */
 
-               eff_buf.token = (char *)context->serv_buf;
+               eff_buf.token = (char *)pt->serv_buf;
 drain:
 
                do {
@@ -672,8 +672,8 @@ drain:
                pending = lws_ssl_pending(wsi);
                if (pending) {
 handle_pending:
-                       pending = pending > sizeof(context->serv_buf) ?
-                               sizeof(context->serv_buf) : pending;
+                       pending = pending > LWS_MAX_SOCKET_IO_BUF ?
+                                       LWS_MAX_SOCKET_IO_BUF : pending;
                        goto read;
                }
 
@@ -720,6 +720,12 @@ handled:
        return n;
 }
 
+LWS_VISIBLE int
+lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd)
+{
+       return lws_service_fd_tsi(context, pollfd, 0);
+}
+
 /**
  * lws_service() - Service any pending websocket activity
  * @context:   Websocket context
@@ -758,3 +764,9 @@ lws_service(struct lws_context *context, int timeout_ms)
        return lws_plat_service(context, timeout_ms);
 }
 
+LWS_VISIBLE int
+lws_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
+{
+       return lws_plat_service_tsi(context, timeout_ms, tsi);
+}
+
index f72b6b5..3b66ccf 100644 (file)
--- a/lib/ssl.c
+++ b/lib/ssl.c
@@ -153,7 +153,7 @@ lws_context_init_server_ssl(struct lws_context_creation_info *info,
                error = ERR_get_error();
                lwsl_err("problem creating ssl method %lu: %s\n",
                        error, ERR_error_string(error,
-                                             (char *)context->serv_buf));
+                                             (char *)context->pt[0].serv_buf));
                return 1;
        }
        context->ssl_ctx = SSL_CTX_new(method); /* create context */
@@ -161,7 +161,7 @@ lws_context_init_server_ssl(struct lws_context_creation_info *info,
                error = ERR_get_error();
                lwsl_err("problem creating ssl context %lu: %s\n",
                        error, ERR_error_string(error,
-                                             (char *)context->serv_buf));
+                                             (char *)context->pt[0].serv_buf));
                return 1;
        }
 
@@ -217,7 +217,7 @@ lws_context_init_server_ssl(struct lws_context_creation_info *info,
                                info->ssl_cert_filepath,
                                error,
                                ERR_error_string(error,
-                                             (char *)context->serv_buf));
+                                             (char *)context->pt[0].serv_buf));
                        return 1;
                }
                lws_ssl_bind_passphrase(context->ssl_ctx, info);
@@ -231,7 +231,7 @@ lws_context_init_server_ssl(struct lws_context_creation_info *info,
                                lwsl_err("ssl problem getting key '%s' %lu: %s\n",
                                         info->ssl_private_key_filepath, error,
                                         ERR_error_string(error,
-                                             (char *)context->serv_buf));
+                                             (char *)context->pt[0].serv_buf));
                                return 1;
                        }
                } else
@@ -250,7 +250,7 @@ lws_context_init_server_ssl(struct lws_context_creation_info *info,
                }
 
 #ifdef LWS_SSL_SERVER_WITH_ECDH_CERT
-               if (context->options & LWS_SERVER_OPTION_SSL_ECDH) { 
+               if (context->options & LWS_SERVER_OPTION_SSL_ECDH) {
                        lwsl_notice(" Using ECDH certificate support\n");
 
                        /* Get X509 certificate from ssl context */
@@ -352,7 +352,7 @@ int lws_context_init_client_ssl(struct lws_context_creation_info *info,
                error = ERR_get_error();
                lwsl_err("problem creating ssl method %lu: %s\n",
                        error, ERR_error_string(error,
-                                     (char *)context->serv_buf));
+                                     (char *)context->pt[0].serv_buf));
                return 1;
        }
        /* create context */
@@ -361,7 +361,7 @@ int lws_context_init_client_ssl(struct lws_context_creation_info *info,
                error = ERR_get_error();
                lwsl_err("problem creating ssl context %lu: %s\n",
                        error, ERR_error_string(error,
-                                     (char *)context->serv_buf));
+                                     (char *)context->pt[0].serv_buf));
                return 1;
        }
 
@@ -416,7 +416,7 @@ int lws_context_init_client_ssl(struct lws_context_creation_info *info,
                                info->ssl_cert_filepath,
                                ERR_get_error(),
                                ERR_error_string(ERR_get_error(),
-                               (char *)context->serv_buf));
+                               (char *)context->pt[0].serv_buf));
                        return 1;
                }
        }
@@ -429,7 +429,7 @@ int lws_context_init_client_ssl(struct lws_context_creation_info *info,
                                info->ssl_private_key_filepath,
                                ERR_get_error(),
                                ERR_error_string(ERR_get_error(),
-                                     (char *)context->serv_buf));
+                                     (char *)context->pt[0].serv_buf));
                        return 1;
                }
 
@@ -459,16 +459,17 @@ LWS_VISIBLE void
 lws_ssl_remove_wsi_from_buffered_list(struct lws *wsi)
 {
        struct lws_context *context = wsi->context;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
 
        if (!wsi->pending_read_list_prev &&
            !wsi->pending_read_list_next &&
-           context->pending_read_list != wsi)
+           pt->pending_read_list != wsi)
                /* we are not on the list */
                return;
 
        /* point previous guy's next to our next */
        if (!wsi->pending_read_list_prev)
-               context->pending_read_list = wsi->pending_read_list_next;
+               pt->pending_read_list = wsi->pending_read_list_next;
        else
                wsi->pending_read_list_prev->pending_read_list_next =
                        wsi->pending_read_list_next;
@@ -486,6 +487,7 @@ LWS_VISIBLE int
 lws_ssl_capable_read(struct lws *wsi, unsigned char *buf, int len)
 {
        struct lws_context *context = wsi->context;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
        int n;
 
        if (!wsi->ssl)
@@ -521,16 +523,16 @@ lws_ssl_capable_read(struct lws *wsi, unsigned char *buf, int len)
                return n;
        if (wsi->pending_read_list_prev)
                return n;
-       if (context->pending_read_list == wsi)
+       if (pt->pending_read_list == wsi)
                return n;
 
        /* add us to the linked list of guys with pending ssl */
-       if (context->pending_read_list)
-               context->pending_read_list->pending_read_list_prev = wsi;
+       if (pt->pending_read_list)
+               pt->pending_read_list->pending_read_list_prev = wsi;
 
-       wsi->pending_read_list_next = context->pending_read_list;
+       wsi->pending_read_list_next = pt->pending_read_list;
        wsi->pending_read_list_prev = NULL;
-       context->pending_read_list = wsi;
+       pt->pending_read_list = wsi;
 
        return n;
 bail:
@@ -596,6 +598,7 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi,
 {
        struct lws *wsi = *pwsi;
        struct lws_context *context = wsi->context;
+       struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
        int n, m;
 #ifndef USE_WOLFSSL
        BIO *bio;
@@ -674,8 +677,8 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi,
 
                lws_latency_pre(context, wsi);
 
-               n = recv(wsi->sock, (char *)context->serv_buf,
-                       sizeof(context->serv_buf), MSG_PEEK);
+               n = recv(wsi->sock, (char *)pt->serv_buf, LWS_MAX_SOCKET_IO_BUF,
+                        MSG_PEEK);
 
                /*
                 * optionally allow non-SSL connect on SSL listening socket
@@ -685,7 +688,7 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi,
                 */
 
                if (context->allow_non_ssl_on_ssl_port) {
-                       if (n >= 1 && context->serv_buf[0] >= ' ') {
+                       if (n >= 1 && pt->serv_buf[0] >= ' ') {
                                /*
                                * TLS content-type for Handshake is 0x16, and
                                * for ChangeCipherSpec Record, it's 0x14
index e03c1d5..9d4a0a6 100644 (file)
@@ -71,4 +71,7 @@
 /* SSL server using ECDH certificate */
 #cmakedefine LWS_SSL_SERVER_WITH_ECDH_CERT
 
+/* Maximum supported service threads */
+#define LWS_MAX_SMP ${LWS_MAX_SMP}
+
 ${LWS_SIZEOFPTR_CODE}
index 4b1aa45..317e432 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * libwebsockets-test-server - libwebsockets test implementation
  *
- * Copyright (C) 2010-2015 Andy Green <andy@warmcat.com>
+ * Copyright (C) 2010-2016 Andy Green <andy@warmcat.com>
  *
  *  This library is free software; you can redistribute it and/or
  *  modify it under the terms of the GNU Lesser General Public
@@ -132,12 +132,34 @@ void *thread_dumb_increment(void *threadid)
        pthread_exit(NULL);
 }
 
+void *thread_service(void *threadid)
+{
+       while (lws_service_tsi(context, 50, (int)(long)threadid) >= 0 && !force_exit)
+               ;
+
+       pthread_exit(NULL);
+}
+
 void sighandler(int sig)
 {
        force_exit = 1;
        lws_cancel_service(context);
 }
 
+static const struct lws_extension exts[] = {
+       {
+               "permessage-deflate",
+               lws_extension_callback_pm_deflate,
+               "permessage-deflate"
+       },
+       {
+               "deflate-frame",
+               lws_extension_callback_pm_deflate,
+               "deflate_frame"
+       },
+       { NULL, NULL, NULL /* terminator */ }
+};
+
 static struct option options[] = {
        { "help",       no_argument,            NULL, 'h' },
        { "debug",      required_argument,      NULL, 'd' },
@@ -147,6 +169,7 @@ static struct option options[] = {
        { "interface",  required_argument,      NULL, 'i' },
        { "closetest",  no_argument,            NULL, 'c' },
        { "libev",  no_argument,                NULL, 'e' },
+       { "threads",  required_argument,        NULL, 'j' },
 #ifndef LWS_NO_DAEMONIZE
        { "daemonize",  no_argument,            NULL, 'D' },
 #endif
@@ -159,10 +182,11 @@ int main(int argc, char **argv)
        struct lws_context_creation_info info;
        char interface_name[128] = "";
        const char *iface = NULL;
-       pthread_t pthread_dumb;
+       pthread_t pthread_dumb, pthread_service[32];
        char cert_path[1024];
        char key_path[1024];
        int debug_level = 7;
+       int threads = 1;
        int use_ssl = 0;
        void *retval;
        int opts = 0;
@@ -184,10 +208,17 @@ int main(int argc, char **argv)
        pthread_mutex_init(&lock_established_conns, NULL);
 
        while (n >= 0) {
-               n = getopt_long(argc, argv, "eci:hsap:d:Dr:", options, NULL);
+               n = getopt_long(argc, argv, "eci:hsap:d:Dr:j:", options, NULL);
                if (n < 0)
                        continue;
                switch (n) {
+               case 'j':
+                       threads = atoi(optarg);
+                       if (threads > ARRAY_SIZE(pthread_service)) {
+                               lwsl_err("Max threads %d\n", ARRAY_SIZE(pthread_service));
+                               return 1;
+                       }
+                       break;
                case 'e':
                        opts |= LWS_SERVER_OPTION_LIBEV;
                        break;
@@ -259,7 +290,7 @@ int main(int argc, char **argv)
        lws_set_log_level(debug_level, lwsl_emit_syslog);
 
        lwsl_notice("libwebsockets test server - "
-                   "(C) Copyright 2010-2015 Andy Green <andy@warmcat.com> - "
+                   "(C) Copyright 2010-2016 Andy Green <andy@warmcat.com> - "
                    "licensed under LGPL2.1\n");
 
        printf("Using resource path \"%s\"\n", resource_path);
@@ -302,6 +333,8 @@ int main(int argc, char **argv)
        info.gid = -1;
        info.uid = -1;
        info.options = opts;
+       info.count_threads = threads;
+       info.extensions = exts;
 
        context = lws_create_context(&info);
        if (context == NULL) {
@@ -317,12 +350,21 @@ int main(int argc, char **argv)
                goto done;
        }
 
-       /* this is our service thread */
+       /*
+        * notice the actual number of threads may be capped by the library,
+        * so use lws_get_count_threads() to get the actual amount of threads
+        * initialized.
+        */
+
+       for (n = 0; n < lws_get_count_threads(context); n++)
+               if (pthread_create(&pthread_service[n], NULL, thread_service,
+                                  (void *)(long)n))
+                       lwsl_err("Failed to start service thread\n");
 
-       n = 0;
-       while (n >= 0 && !force_exit) {
-               n = lws_service(context, 50);
-       }
+       /* wait for all the service threads to exit */
+
+       for (n = 0; n < lws_get_count_threads(context); n++)
+               pthread_join(pthread_service[n], &retval);
 
        /* wait for pthread_dumb to exit */
        pthread_join(pthread_dumb, &retval);