option(LWS_MBED3 "Platform is MBED3" OFF)
option(LWS_SSL_SERVER_WITH_ECDH_CERT "Include SSL server use ECDH certificate" OFF)
-
-if (DEFINED YOTTA_WEBSOCKETS_VERSION_STRING)
-
-set(LWS_WITH_SHARED OFF)
-set(LWS_WITH_SSL OFF)
-set(LWS_WITH_ZLIB OFF)
-set(LWS_WITHOUT_CLIENT ON)
-set(LWS_WITHOUT_TESTAPPS ON)
-set(LWS_WITHOUT_EXTENSIONS ON)
-set(LWS_MBED3 ON)
-
-endif()
-
if (DEFINED YOTTA_WEBSOCKETS_VERSION_STRING)
set(LWS_WITH_SHARED OFF)
set(LWS_WITHOUT_TESTAPPS ON)
set(LWS_WITHOUT_EXTENSIONS ON)
set(LWS_MBED3 ON)
+set(LWS_MAX_SMP 1)
endif()
set(LWS_USE_HTTP2 1)
endif()
+if ("${LWS_MAX_SMP}" STREQUAL "")
+ set(LWS_MAX_SMP 32)
+endif()
+
#if (LWS_MBED3)
# set(CMAKE_C_FLAGS "-D_DEBUG ${CMAKE_C_FLAGS}")
#endif()
message(" LWS_WITH_HTTP2 = ${LWS_WITH_HTTP2}")
message(" LWS_MBED3 = ${LWS_MBED3}")
message(" LWS_SSL_SERVER_WITH_ECDH_CERT = ${LWS_SSL_SERVER_WITH_ECDH_CERT}")
+message(" LWS_MAX_SMP = ${LWS_MAX_SMP}")
message("---------------------------------------------------------------------")
# These will be available to parent projects including libwebsockets using add_subdirectory()
$ LD_LIBRARY_PATH=/usr/local/ssl/lib libwebsockets-test-server --ssl
```
+ **NOTE5**:
+ To build with debug info and _DEBUG for lower priority debug messages
+ compiled in, use
+
+ ```bash
+ $ cmake .. -DCMAKE_BUILD_TYPE=DEBUG
+ ````
+
4. Finally you can build using the generated Makefile:
```bash
to build in support and select it at runtime.
+SMP / Multithreaded service
+---------------------------
+SMP support is integrated into LWS without any internal threading. It's
+very simple to use, libwebsockets-test-server-pthread shows how to do it,
+use -j <n> argument there to control the number of service threads up to 32.
+
+Two new members are added to the info struct
+
+ unsigned int count_threads;
+ unsigned int fd_limit_per_thread;
+
+leave them at the default 0 to get the normal singlethreaded service loop.
+
+Set count_threads to n to tell lws you will have n simultaneous service threads
+operating on the context.
+
+There is still a single listen socket on one port, no matter how many
+service threads.
+
+When a connection is made, it is accepted by the service thread with the least
+connections active to perform load balancing.
+
+The user code is responsible for spawning n threads running the service loop
+associated to a specific tsi (Thread Service Index, 0 .. n - 1). See
+the libwebsockets-test-server-pthread for how to do.
+
+If you leave fd_limit_per_thread at 0, then the process limit of fds is shared
+between the service threads; if you process was allowed 1024 fds overall then
+each thread is limited to 1024 / n.
+
+You can set fd_limit_per_thread to a nonzero number to control this manually, eg
+the overall supported fd limit is less than the process allowance.
+
+You can control the context basic data allocation for multithreading from Cmake
+using -DLWS_MAX_SMP=, if not given it's set to 32. The serv_buf allocation
+for the threads (currently 4096) is made at runtime only for active threads.
+
+Because lws will limit the requested number of actual threads supported
+according to LWS_MAX_SMP, there is an api lws_get_count_threads(context) to
+discover how many threads were actually allowed when the context was created.
+
+It's required to implement locking in the user code in the same way that
+libwebsockets-test-server-pthread does it, for the FD locking callbacks.
+
+There is no knowledge or dependency in lws itself about pthreads. How the
+locking is implemented is entirely up to the user code.
6) There's a new api lws_parse_uri() that simplies chopping up
https://xxx:yyy/zzz uris into parts nicely. The test client now uses this
-to allow proper uris.
+to allow proper uris as well as the old address style.
+
+7) SMP support is integrated into LWS without any internal threading. It's
+very simple to use, libwebsockets-test-server-pthread shows how to do it,
+use -j <n> argument there to control the number of service threads up to 32.
+
+Two new members are added to the info struct
+
+ unsigned int count_threads;
+ unsigned int fd_limit_per_thread;
+
+leave them at the default 0 to get the normal singlethreaded service loop.
+
+Set count_threads to n to tell lws you will have n simultaneous service threads
+operating on the context.
+
+There is still a single listen socket on one port, no matter how many
+service threads.
+
+When a connection is made, it is accepted by the service thread with the least
+connections active to perform load balancing.
+
+The user code is responsible for spawning n threads running the service loop
+associated to a specific tsi (Thread Service Index, 0 .. n - 1). See
+the libwebsockets-test-server-pthread for how to do.
+
+If you leave fd_limit_per_thread at 0, then the process limit of fds is shared
+between the service threads; if you process was allowed 1024 fds overall then
+each thread is limited to 1024 / n.
+
+You can set fd_limit_per_thread to a nonzero number to control this manually, eg
+the overall supported fd limit is less than the process allowance.
+
+You can control the context basic data allocation for multithreading from Cmake
+using -DLWS_MAX_SMP=, if not given it's set to 32. The serv_buf allocation
+for the threads (currently 4096) is made at runtime only for active threads.
+
+Because lws will limit the requested number of actual threads supported
+according to LWS_MAX_SMP, there is an api lws_get_count_threads(context) to
+discover how many threads were actually allowed when the context was created.
+
+It's required to implement locking in the user code in the same way that
+libwebsockets-test-server-pthread does it, for the FD locking callbacks.
+
+There is no knowledge or dependency in lws itself about pthreads. How the
+locking is implemented is entirely up to the user code.
User api changes
struct addrinfo hints, *result;
#endif
struct lws_context *context = wsi->context;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
struct sockaddr_in server_addr4;
struct sockaddr_in client_addr4;
struct lws_pollfd pfd;
/* proxy? */
if (context->http_proxy_port) {
- plen = sprintf((char *)context->serv_buf,
+ plen = sprintf((char *)pt->serv_buf,
"CONNECT %s:%u HTTP/1.0\x0d\x0a"
"User-agent: libwebsockets\x0d\x0a",
lws_hdr_simple_ptr(wsi, _WSI_TOKEN_CLIENT_PEER_ADDRESS),
wsi->u.hdr.ah->c_port);
if (context->proxy_basic_auth_token[0])
- plen += sprintf((char *)context->serv_buf + plen,
+ plen += sprintf((char *)pt->serv_buf + plen,
"Proxy-authorization: basic %s\x0d\x0a",
context->proxy_basic_auth_token);
- plen += sprintf((char *)context->serv_buf + plen,
+ plen += sprintf((char *)pt->serv_buf + plen,
"\x0d\x0a");
ads = context->http_proxy_address;
goto failed;
wsi->u.hdr.ah->c_port = context->http_proxy_port;
- n = send(wsi->sock, (char *)context->serv_buf, plen,
+ n = send(wsi->sock, (char *)pt->serv_buf, plen,
MSG_NOSIGNAL);
if (n < 0) {
lwsl_debug("ERROR writing to proxy socket\n");
int lws_client_rx_sm(struct lws *wsi, unsigned char c)
{
+ struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
int callback_action = LWS_CALLBACK_CLIENT_RECEIVE;
unsigned short close_code;
unsigned char *pp;
if (wsi->u.ws.rx_draining_ext) {
- struct lws **w = &wsi->context->rx_draining_ext_list;
+ struct lws **w = &pt->rx_draining_ext_list;
lwsl_ext("%s: RX EXT DRAINING: Removing from list\n", __func__, c);
assert(!c);
eff_buf.token = NULL;
* last chunk
*/
wsi->u.ws.rx_draining_ext = 1;
- wsi->u.ws.rx_draining_ext_list = wsi->context->rx_draining_ext_list;
- wsi->context->rx_draining_ext_list = wsi;
+ wsi->u.ws.rx_draining_ext_list = pt->rx_draining_ext_list;
+ pt->rx_draining_ext_list = wsi;
lwsl_ext("%s: RX EXT DRAINING: Adding to list\n", __func__);
}
if (wsi->state == LWSS_RETURNED_CLOSE_ALREADY ||
int lws_client_socket_service(struct lws_context *context,
struct lws *wsi, struct lws_pollfd *pollfd)
{
- char *p = (char *)&context->serv_buf[0];
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+ char *p = (char *)&pt->serv_buf[0];
+ char *sb = p;
unsigned char c;
int n, len;
return 0;
}
- n = recv(wsi->sock, (char *)context->serv_buf,
- sizeof(context->serv_buf), 0);
+ n = recv(wsi->sock, sb, LWS_MAX_SOCKET_IO_BUF, 0);
if (n < 0) {
if (LWS_ERRNO == LWS_EAGAIN) {
return 0;
}
- context->serv_buf[13] = '\0';
- if (strcmp((char *)context->serv_buf, "HTTP/1.0 200 ") &&
- strcmp((char *)context->serv_buf, "HTTP/1.1 200 ")
+ pt->serv_buf[13] = '\0';
+ if (strcmp(sb, "HTTP/1.0 200 ") &&
+ strcmp(sb, "HTTP/1.1 200 ")
) {
lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS);
- lwsl_err("ERROR proxy: %s\n", context->serv_buf);
+ lwsl_err("ERROR proxy: %s\n", sb);
return 0;
}
n = ERR_get_error();
if (n != SSL_ERROR_NONE) {
lwsl_err("SSL connect error %lu: %s\n",
- n,
- ERR_error_string(n,
- (char *)context->serv_buf));
+ n, ERR_error_string(n, sb));
return 0;
}
}
n = ERR_get_error();
if (n != SSL_ERROR_NONE) {
lwsl_err("SSL connect error %lu: %s\n",
- n, ERR_error_string(n,
- (char *)context->serv_buf));
+ n, ERR_error_string(n, sb));
return 0;
}
}
lwsl_notice("accepting self-signed certificate\n");
} else {
lwsl_err("server's cert didn't look good, X509_V_ERR = %d: %s\n",
- n, ERR_error_string(n, (char *)context->serv_buf));
+ n, ERR_error_string(n, sb));
lws_close_free_wsi(wsi,
LWS_CLOSE_STATUS_NOSTATUS);
return 0;
lws_latency_pre(context, wsi);
- n = lws_ssl_capable_write(wsi, context->serv_buf,
- p - (char *)context->serv_buf);
+ n = lws_ssl_capable_write(wsi, (unsigned char *)sb, p - sb);
lws_latency(context, wsi, "send lws_issue_raw", n,
- n == p - (char *)context->serv_buf);
+ n == p - sb);
switch (n) {
case LWS_SSL_CAPABLE_ERROR:
lwsl_debug("ERROR writing to client socket\n");
int
lws_client_interpret_server_handshake(struct lws *wsi)
{
+ int n, len, okay = 0, isErrorCodeReceived = 0, port = 0, ssl = 0;
struct lws_context *context = wsi->context;
int close_reason = LWS_CLOSE_STATUS_PROTOCOL_ERR;
- int n, len, okay = 0, isErrorCodeReceived = 0, port = 0, ssl = 0;
const char *pc, *prot, *ads = NULL, *path;
char *p;
#ifndef LWS_NO_EXTENSIONS
- const struct lws_extension *ext;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+ char *sb = (char *)&pt->serv_buf[0];
const struct lws_ext_options *opts;
+ const struct lws_extension *ext;
char ext_name[128];
const char *c, *a;
char ignore;
* and go through matching them or identifying bogons
*/
- if (lws_hdr_copy(wsi, (char *)context->serv_buf,
- sizeof(context->serv_buf), WSI_TOKEN_EXTENSIONS) < 0) {
+ if (lws_hdr_copy(wsi, sb, LWS_MAX_SOCKET_IO_BUF, WSI_TOKEN_EXTENSIONS) < 0) {
lwsl_warn("ext list from server failed to copy\n");
goto bail2;
}
- c = (char *)context->serv_buf;
+ c = sb;
n = 0;
ignore = 0;
a = NULL;
context->protocols[0].callback(wsi,
LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER,
NULL, &p,
- (pkt + sizeof(context->serv_buf)) - p - 12);
+ (pkt + LWS_MAX_SOCKET_IO_BUF) - p - 12);
p += sprintf(p, "\x0d\x0a");
#endif
lws_feature_status_libev(info);
#endif
- lwsl_info(" LWS_MAX_HEADER_LEN: %u\n", LWS_MAX_HEADER_LEN);
- lwsl_info(" LWS_MAX_PROTOCOLS: %u\n", LWS_MAX_PROTOCOLS);
-
- lwsl_info(" SPEC_LATEST_SUPPORTED: %u\n", SPEC_LATEST_SUPPORTED);
- lwsl_info(" AWAITING_TIMEOUT: %u\n", AWAITING_TIMEOUT);
- lwsl_info(" sizeof (*info): %u\n", sizeof(*info));
+ lwsl_info(" LWS_MAX_HEADER_LEN : %u\n", LWS_MAX_HEADER_LEN);
+ lwsl_info(" LWS_MAX_PROTOCOLS : %u\n", LWS_MAX_PROTOCOLS);
+ lwsl_info(" LWS_MAX_SMP : %u\n", LWS_MAX_SMP);
+ lwsl_info(" SPEC_LATEST_SUPPORTED : %u\n", SPEC_LATEST_SUPPORTED);
+ lwsl_info(" AWAITING_TIMEOUT : %u\n", AWAITING_TIMEOUT);
+ lwsl_info(" sizeof (*info) : %u\n", sizeof(*info));
#if LWS_POSIX
lwsl_info(" SYSTEM_RANDOM_FILEPATH: '%s'\n", SYSTEM_RANDOM_FILEPATH);
- lwsl_info(" LWS_MAX_ZLIB_CONN_BUFFER: %u\n", LWS_MAX_ZLIB_CONN_BUFFER);
#endif
if (lws_plat_context_early_init())
return NULL;
lwsl_notice(" Started with daemon pid %d\n", pid_daemon);
}
#endif
+ context->max_fds = getdtablesize();
+
+ if (info->count_threads)
+ context->count_threads = info->count_threads;
+ else
+ context->count_threads = 1;
+
+ if (context->count_threads > LWS_MAX_SMP)
+ context->count_threads = LWS_MAX_SMP;
+
context->lserv_seen = 0;
context->protocols = info->protocols;
context->token_limits = info->token_limits;
context->ka_interval = info->ka_interval;
context->ka_probes = info->ka_probes;
+ /* we zalloc only the used ones, so the memory is not wasted
+ * allocating for unused threads
+ */
+ for (n = 0; n < context->count_threads; n++) {
+ context->pt[n].serv_buf = lws_zalloc(LWS_MAX_SOCKET_IO_BUF);
+ if (!context->pt[n].serv_buf) {
+ lwsl_err("OOM\n");
+ return NULL;
+ }
+ }
+
+ if (info->fd_limit_per_thread)
+ context->fd_limit_per_thread = info->fd_limit_per_thread;
+ else
+ context->fd_limit_per_thread = context->max_fds /
+ context->count_threads;
+
+ lwsl_notice(" Threads: %d each %d fds\n", context->count_threads,
+ context->fd_limit_per_thread);
+
memset(&wsi, 0, sizeof(wsi));
wsi.context = context;
context->lws_ev_sigint_cb = &lws_sigint_cb;
#endif /* LWS_USE_LIBEV */
- lwsl_info(" mem: context: %5u bytes\n", sizeof(struct lws_context));
+ lwsl_info(" mem: context: %5u bytes (%d + (%d x %d))\n",
+ sizeof(struct lws_context) +
+ (context->count_threads * LWS_MAX_SOCKET_IO_BUF),
+ sizeof(struct lws_context),
+ context->count_threads,
+ LWS_MAX_SOCKET_IO_BUF);
/*
* allocate and initialize the pool of
/* this is per context */
lwsl_info(" mem: http hdr rsvd: %5u bytes ((%u + %u) x %u)\n",
- (context->max_http_header_data + sizeof(struct allocated_headers)) *
+ (context->max_http_header_data +
+ sizeof(struct allocated_headers)) *
context->max_http_header_pool,
- context->max_http_header_data, sizeof(struct allocated_headers),
+ context->max_http_header_data,
+ sizeof(struct allocated_headers),
context->max_http_header_pool);
-
- context->max_fds = getdtablesize();
-
- context->fds = lws_zalloc(sizeof(struct lws_pollfd) * context->max_fds);
- if (context->fds == NULL) {
+ n = sizeof(struct lws_pollfd) * context->count_threads *
+ context->fd_limit_per_thread;
+ context->pt[0].fds = lws_zalloc(n);
+ if (context->pt[0].fds == NULL) {
lwsl_err("OOM allocating %d fds\n", context->max_fds);
goto bail;
}
-
- lwsl_info(" mem: pollfd map: %5u\n",
- sizeof(struct lws_pollfd) * context->max_fds);
+ lwsl_info(" mem: pollfd map: %5u\n", n);
+ /* each thread serves his own chunk of fds */
+ for (n = 1; n < (int)info->count_threads; n++)
+ context->pt[n].fds = context->pt[0].fds +
+ (n * context->fd_limit_per_thread);
if (lws_plat_init(context, info))
goto bail;
{
const struct lws_protocols *protocol = NULL;
struct lws wsi;
- int n;
+ int n, m = context->count_threads;
lwsl_notice("%s\n", __func__);
lwsl_notice("Worst latency: %s\n", context->worst_latency_info);
#endif
- for (n = 0; n < context->fds_count; n++) {
- struct lws *wsi = wsi_from_fd(context, context->fds[n].fd);
- if (!wsi)
- continue;
- lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY
+ while (m--)
+ for (n = 0; n < context->pt[m].fds_count; n++) {
+ struct lws *wsi = wsi_from_fd(context, context->pt[m].fds[n].fd);
+ if (!wsi)
+ continue;
+ lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY
/* no protocol close */);
- n--;
- }
+ n--;
+ }
/*
* give all extensions a chance to clean up any per-context
ev_signal_stop(context->io_loop, &context->w_sigint.watcher);
#endif /* LWS_USE_LIBEV */
+ for (n = 0; n < context->count_threads; n++)
+ lws_free_set_NULL(context->pt[n].serv_buf);
+
lws_plat_context_early_destroy(context);
lws_ssl_context_destroy(context);
- if (context->fds)
- lws_free(context->fds);
+ if (context->pt[0].fds)
+ lws_free(context->pt[0].fds);
if (context->ah_pool)
lws_free(context->ah_pool);
if (context->http_header_data)
{
int n, m;
struct lws_context *context = lws_get_context(wsi);
- unsigned char *p = context->serv_buf +
- LWS_PRE;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+ unsigned char *p = pt->serv_buf + LWS_PRE;
unsigned char *start = p;
- unsigned char *end = p + sizeof(context->serv_buf) -
- LWS_PRE;
+ unsigned char *end = p + LWS_MAX_SOCKET_IO_BUF - LWS_PRE;
if (!html_body)
html_body = "";
lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
{
struct lws_context *context = wsi->context;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
int n, m, ret, old_state;
struct lws_tokens eff_buf;
wsi->mode == LWSCM_WS_CLIENT) {
if (wsi->u.ws.rx_draining_ext) {
- struct lws **w = &wsi->context->rx_draining_ext_list;
+ struct lws **w = &pt->rx_draining_ext_list;
wsi->u.ws.rx_draining_ext = 0;
/* remove us from context draining ext list */
}
if (wsi->u.ws.tx_draining_ext) {
- struct lws **w = &wsi->context->tx_draining_ext_list;
+ struct lws **w = &pt->tx_draining_ext_list;
wsi->u.ws.tx_draining_ext = 0;
/* remove us from context draining ext list */
lws_callback_all_protocol(struct lws_context *context,
const struct lws_protocols *protocol, int reason)
{
+ struct lws_context_per_thread *pt = &context->pt[0];
struct lws *wsi;
- int n;
-
- for (n = 0; n < context->fds_count; n++) {
- wsi = wsi_from_fd(context, context->fds[n].fd);
- if (!wsi)
- continue;
- if (wsi->protocol == protocol)
- protocol->callback(wsi, reason, wsi->user_space, NULL, 0);
+ int n, m = context->count_threads;
+
+ while (m--) {
+ for (n = 0; n < pt->fds_count; n++) {
+ wsi = wsi_from_fd(context, pt->fds[n].fd);
+ if (!wsi)
+ continue;
+ if (wsi->protocol == protocol)
+ protocol->callback(wsi, reason, wsi->user_space, NULL, 0);
+ }
+ pt++;
}
return 0;
lws_rx_flow_allow_all_protocol(const struct lws_context *context,
const struct lws_protocols *protocol)
{
- int n;
+ const struct lws_context_per_thread *pt = &context->pt[0];
struct lws *wsi;
-
- for (n = 0; n < context->fds_count; n++) {
- wsi = wsi_from_fd(context, context->fds[n].fd);
- if (!wsi)
- continue;
- if (wsi->protocol == protocol)
- lws_rx_flow_control(wsi, LWS_RXFLOW_ALLOW);
+ int n, m = context->count_threads;
+
+ while (m--) {
+ for (n = 0; n < pt->fds_count; n++) {
+ wsi = wsi_from_fd(context, pt->fds[n].fd);
+ if (!wsi)
+ continue;
+ if (wsi->protocol == protocol)
+ lws_rx_flow_control(wsi, LWS_RXFLOW_ALLOW);
+ }
+ pt++;
}
}
return wsi->context;
}
+LWS_VISIBLE LWS_EXTERN int
+lws_get_count_threads(struct lws_context *context)
+{
+ return context->count_threads;
+}
+
LWS_VISIBLE LWS_EXTERN void *
lws_wsi_user(struct lws *wsi)
{
unsigned char *buf, size_t len)
{
unsigned char *p, *start;
- int budget = sizeof(wsi->u.ws.ping_payload_buf) -
- LWS_PRE;
+ int budget = sizeof(wsi->u.ws.ping_payload_buf) - LWS_PRE;
assert(wsi->mode == LWSCM_WS_SERVING || wsi->mode == LWSCM_WS_CLIENT);
{
/* there is no pending change */
if (!(wsi->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE)) {
- lwsl_info("%s: no pending change\n", __func__);
+ lwsl_debug("%s: no pending change\n", __func__);
return 0;
}
*/
LWS_VISIBLE LWS_EXTERN int
-lws_parse_uri(char *p, const char **prot, const char **ads, int *port, const char **path)
+lws_parse_uri(char *p, const char **prot, const char **ads, int *port,
+ const char **path)
{
const char *end;
static const char *slash = "/";
* allocated for the lifetime of the context). If the pool is
* busy new incoming connections must wait for accept until one
* becomes free.
+ * @count_threads: how many contexts to create in an array, 0 = 1
+ * @fd_limit_per_thread: nonzero means restrict each service thread to this
+ * many fds, 0 means the default which is divide the process fd
+ * limit by the number of threads.
+ *
*/
struct lws_context_creation_info {
short max_http_header_data;
short max_http_header_pool;
+ unsigned int count_threads;
+ unsigned int fd_limit_per_thread;
+
/* Add new things just above here ---^
* This is part of the ABI, don't needlessly break compatibility
*
LWS_VISIBLE LWS_EXTERN int
lws_service(struct lws_context *context, int timeout_ms);
+LWS_VISIBLE LWS_EXTERN int
+lws_service_tsi(struct lws_context *context, int timeout_ms, int tsi);
+
+LWS_VISIBLE LWS_EXTERN void
+lws_cancel_service_pt(struct lws *wsi);
+
LWS_VISIBLE LWS_EXTERN void
lws_cancel_service(struct lws_context *context);
unsigned int code, unsigned char **p,
unsigned char *end);
-LWS_EXTERN int
+LWS_VISIBLE LWS_EXTERN int
lws_http_transaction_completed(struct lws *wsi);
#ifdef LWS_USE_LIBEV
LWS_VISIBLE LWS_EXTERN int
lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd);
+LWS_VISIBLE LWS_EXTERN int
+lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd,
+ int tsi);
+
LWS_VISIBLE LWS_EXTERN void *
lws_context_user(struct lws_context *context);
LWS_VISIBLE LWS_EXTERN struct lws_context *
lws_get_context(const struct lws *wsi);
+LWS_VISIBLE LWS_EXTERN int
+lws_get_count_threads(struct lws_context *context);
+
/*
* Wsi-associated File Operations access helpers
*
}
/**
- * lws_cancel_service() - Cancel servicing of pending websocket activity
- * @context: Websocket context
+ * lws_cancel_service_pt() - Cancel servicing of pending socket activity
+ * on one thread
+ * @wsi: Cancel service on the thread this wsi is serviced by
*
* This function let a call to lws_service() waiting for a timeout
* immediately return.
*/
LWS_VISIBLE void
-lws_cancel_service(struct lws_context *context)
+lws_cancel_service_pt(struct lws *wsi)
{
+ struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
char buf = 0;
- if (write(context->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1)
+ if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1)
lwsl_err("Cannot write to dummy pipe");
}
+/**
+ * lws_cancel_service() - Cancel ALL servicing of pending socket activity
+ * @context: Websocket context
+ *
+ * This function let a call to lws_service() waiting for a timeout
+ * immediately return.
+ */
+LWS_VISIBLE void
+lws_cancel_service(struct lws_context *context)
+{
+ struct lws_context_per_thread *pt = &context->pt[0];
+ char buf = 0, m = context->count_threads;
+
+ while (m--) {
+ if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1)
+ lwsl_err("Cannot write to dummy pipe");
+ pt++;
+ }
+}
+
LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
{
int syslog_level = LOG_DEBUG;
}
LWS_VISIBLE int
-lws_plat_service(struct lws_context *context, int timeout_ms)
+lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
{
- int n;
- int m;
- char buf;
+ struct lws_context_per_thread *pt = &context->pt[tsi];
struct lws *wsi;
+ int n, m;
+ char buf;
#ifdef LWS_OPENSSL_SUPPORT
struct lws *wsi_next;
#endif
context->service_tid = context->service_tid_detected;
/* if we know we are draining rx ext, do not wait in poll */
- if (context->rx_draining_ext_list)
+ if (pt->rx_draining_ext_list)
timeout_ms = 0;
#ifdef LWS_OPENSSL_SUPPORT
/* if we know we have non-network pending data, do not wait in poll */
- if (lws_ssl_anybody_has_buffered_read(context)) {
+ if (lws_ssl_anybody_has_buffered_read_tsi(context, tsi)) {
timeout_ms = 0;
lwsl_err("ssl buffered read\n");
}
#endif
- n = poll(context->fds, context->fds_count, timeout_ms);
+ n = poll(pt->fds, pt->fds_count, timeout_ms);
#ifdef LWS_OPENSSL_SUPPORT
- if (!context->rx_draining_ext_list &&
- !lws_ssl_anybody_has_buffered_read(context) && n == 0) {
+ if (!pt->rx_draining_ext_list &&
+ !lws_ssl_anybody_has_buffered_read_tsi(context, tsi) && !n) {
#else
- if (!context->rx_draining_ext_list && n == 0) /* poll timeout */ {
+ if (!pt->rx_draining_ext_list && !n) /* poll timeout */ {
#endif
- lws_service_fd(context, NULL);
+ lws_service_fd_tsi(context, NULL, tsi);
return 0;
}
* For all guys with already-available ext data to drain, if they are
* not flowcontrolled, fake their POLLIN status
*/
- wsi = context->rx_draining_ext_list;
+ wsi = pt->rx_draining_ext_list;
while (wsi) {
- context->fds[wsi->position_in_fds_table].revents |=
- context->fds[wsi->position_in_fds_table].events & POLLIN;
+ pt->fds[wsi->position_in_fds_table].revents |=
+ pt->fds[wsi->position_in_fds_table].events & POLLIN;
wsi = wsi->u.ws.rx_draining_ext_list;
}
* network socket may have nothing
*/
- wsi = context->pending_read_list;
+ wsi = pt->pending_read_list;
while (wsi) {
wsi_next = wsi->pending_read_list_next;
- context->fds[wsi->position_in_fds_table].revents |=
- context->fds[wsi->position_in_fds_table].events & POLLIN;
- if (context->fds[wsi->position_in_fds_table].revents & POLLIN)
+ pt->fds[wsi->position_in_fds_table].revents |=
+ pt->fds[wsi->position_in_fds_table].events & POLLIN;
+ if (pt->fds[wsi->position_in_fds_table].revents & POLLIN)
/*
* he's going to get serviced now, take him off the
* list of guys with buffered SSL. If he still has some
/* any socket with events to service? */
- for (n = 0; n < context->fds_count; n++) {
- if (!context->fds[n].revents)
+ for (n = 0; n < pt->fds_count; n++) {
+ if (!pt->fds[n].revents)
continue;
- if (context->fds[n].fd == context->dummy_pipe_fds[0]) {
- if (read(context->fds[n].fd, &buf, 1) != 1)
+ if (pt->fds[n].fd == pt->dummy_pipe_fds[0]) {
+ if (read(pt->fds[n].fd, &buf, 1) != 1)
lwsl_err("Cannot read from dummy pipe.");
continue;
}
- m = lws_service_fd(context, &context->fds[n]);
+ m = lws_service_fd_tsi(context, &pt->fds[n], tsi);
if (m < 0)
return -1;
/* if something closed, retry this slot */
}
LWS_VISIBLE int
+lws_plat_service(struct lws_context *context, int timeout_ms)
+{
+ return lws_plat_service_tsi(context, timeout_ms, 0);
+}
+
+LWS_VISIBLE int
lws_plat_set_socket_options(struct lws_context *context, int fd)
{
int optval = 1;
LWS_VISIBLE void
lws_plat_context_late_destroy(struct lws_context *context)
{
+ struct lws_context_per_thread *pt = &context->pt[0];
+ int m = context->count_threads;
+
if (context->lws_lookup)
lws_free(context->lws_lookup);
- close(context->dummy_pipe_fds[0]);
- close(context->dummy_pipe_fds[1]);
+ while (m--) {
+ close(pt->dummy_pipe_fds[0]);
+ close(pt->dummy_pipe_fds[1]);
+ pt++;
+ }
close(context->fd_random);
}
}
LWS_VISIBLE void
-lws_plat_insert_socket_into_fds(struct lws_context *context,
- struct lws *wsi)
+lws_plat_insert_socket_into_fds(struct lws_context *context, struct lws *wsi)
{
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+
lws_libev_io(wsi, LWS_EV_START | LWS_EV_READ);
- context->fds[context->fds_count++].revents = 0;
+ pt->fds[pt->fds_count++].revents = 0;
}
LWS_VISIBLE void
lws_plat_init(struct lws_context *context,
struct lws_context_creation_info *info)
{
- context->lws_lookup = lws_zalloc(sizeof(struct lws *) * context->max_fds);
+ struct lws_context_per_thread *pt = &context->pt[0];
+ int n = context->count_threads, fd;
+
+ /* master context has the global fd lookup array */
+ context->lws_lookup = lws_zalloc(sizeof(struct lws *) *
+ context->max_fds);
if (context->lws_lookup == NULL) {
- lwsl_err(
- "Unable to allocate lws_lookup array for %d connections\n",
- context->max_fds);
+ lwsl_err("OOM on lws_lookup array for %d connections\n",
+ context->max_fds);
return 1;
}
lwsl_notice(" mem: platform fd map: %5u bytes\n",
sizeof(struct lws *) * context->max_fds);
+ fd = open(SYSTEM_RANDOM_FILEPATH, O_RDONLY);
- context->fd_random = open(SYSTEM_RANDOM_FILEPATH, O_RDONLY);
+ context->fd_random = fd;
if (context->fd_random < 0) {
lwsl_err("Unable to open random device %s %d\n",
- SYSTEM_RANDOM_FILEPATH, context->fd_random);
+ SYSTEM_RANDOM_FILEPATH, context->fd_random);
return 1;
}
if (!lws_libev_init_fd_table(context)) {
/* otherwise libev handled it instead */
- if (pipe(context->dummy_pipe_fds)) {
- lwsl_err("Unable to create pipe\n");
- return 1;
+ while (n--) {
+ if (pipe(pt->dummy_pipe_fds)) {
+ lwsl_err("Unable to create pipe\n");
+ return 1;
+ }
+
+ /* use the read end of pipe as first item */
+ pt->fds[0].fd = pt->dummy_pipe_fds[0];
+ pt->fds[0].events = LWS_POLLIN;
+ pt->fds[0].revents = 0;
+ pt->fds_count = 1;
+ pt++;
}
}
- /* use the read end of pipe as first item */
- context->fds[0].fd = context->dummy_pipe_fds[0];
- context->fds[0].events = LWS_POLLIN;
- context->fds[0].revents = 0;
- context->fds_count = 1;
-
context->fops.open = _lws_plat_file_open;
context->fops.close = _lws_plat_file_close;
context->fops.seek_cur = _lws_plat_file_seek_cur;
+#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include "private-libwebsockets.h"
unsigned long long
LWS_VISIBLE void
lws_cancel_service(struct lws_context *context)
{
- WSASetEvent(context->events[0]);
+ struct lws_context_per_thread *pt = &context->pt[0];
+ int n = context->count_threads;
+
+ while (n--) {
+ WSASetEvent(pt->events[0]);
+ pt++;
+ }
+}
+
+LWS_VISIBLE void
+lws_cancel_service_pt(struct lws *wsi)
+{
+ struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
+ WSASetEvent(pt->events[0]);
}
LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
}
LWS_VISIBLE int
-lws_plat_service(struct lws_context *context, int timeout_ms)
+lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
{
int n;
int i;
WSANETWORKEVENTS networkevents;
struct lws_pollfd *pfd;
struct lws *wsi;
+ struct lws_context_per_thread *pt = &context->pt[tsi];
/* stay dead once we are dead */
}
context->service_tid = context->service_tid_detected;
- for (i = 0; i < context->fds_count; ++i) {
- pfd = &context->fds[i];
+ for (i = 0; i < pt->fds_count; ++i) {
+ pfd = &pt->fds[i];
if (pfd->fd == context->lserv_fd)
continue;
}
}
- ev = WSAWaitForMultipleEvents(context->fds_count + 1, context->events,
+ ev = WSAWaitForMultipleEvents(pt->fds_count + 1, pt->events,
FALSE, timeout_ms, FALSE);
context->service_tid = 0;
}
if (ev == WSA_WAIT_EVENT_0) {
- WSAResetEvent(context->events[0]);
+ WSAResetEvent(pt->events[0]);
return 0;
}
- if (ev < WSA_WAIT_EVENT_0 || ev > WSA_WAIT_EVENT_0 + context->fds_count)
+ if (ev < WSA_WAIT_EVENT_0 || ev > WSA_WAIT_EVENT_0 + pt->fds_count)
return -1;
- pfd = &context->fds[ev - WSA_WAIT_EVENT_0 - 1];
+ pfd = &pt->fds[ev - WSA_WAIT_EVENT_0 - 1];
if (WSAEnumNetworkEvents(pfd->fd,
- context->events[ev - WSA_WAIT_EVENT_0],
+ pt->events[ev - WSA_WAIT_EVENT_0],
&networkevents) == SOCKET_ERROR) {
lwsl_err("WSAEnumNetworkEvents() failed with error %d\n",
LWS_ERRNO);
}
LWS_VISIBLE int
+lws_plat_service(struct lws_context *context, int timeout_ms)
+{
+ return lws_plat_service_tsi(context, timeout_ms, 0);
+}
+
+LWS_VISIBLE int
lws_plat_set_socket_options(struct lws_context *context, lws_sockfd_type fd)
{
int optval = 1;
LWS_VISIBLE void
lws_plat_context_early_destroy(struct lws_context *context)
{
- if (context->events) {
- WSACloseEvent(context->events[0]);
- lws_free(context->events);
+ struct lws_context_per_thread *pt = &context->pt[0];
+ int n = context->count_threads;
+
+ while (n--) {
+ if (pt->events) {
+ WSACloseEvent(pt->events[0]);
+ lws_free(pt->events);
+ }
+ pt++;
}
}
}
LWS_VISIBLE void
-lws_plat_insert_socket_into_fds(struct lws_context *context,
- struct lws *wsi)
+lws_plat_insert_socket_into_fds(struct lws_context *context, struct lws *wsi)
{
- context->fds[context->fds_count++].revents = 0;
- context->events[context->fds_count] = WSACreateEvent();
- WSAEventSelect(wsi->sock, context->events[context->fds_count], LWS_POLLIN);
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+
+ pt->fds[pt->fds_count++].revents = 0;
+ pt->events[pt->fds_count] = WSACreateEvent();
+ WSAEventSelect(wsi->sock, pt->events[pt->fds_count], LWS_POLLIN);
}
LWS_VISIBLE void
lws_plat_delete_socket_from_fds(struct lws_context *context,
struct lws *wsi, int m)
{
- WSACloseEvent(context->events[m + 1]);
- context->events[m + 1] = context->events[context->fds_count + 1];
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+
+ WSACloseEvent(pt->events[m + 1]);
+ pt->events[m + 1] = pt->events[pt->fds_count + 1];
}
LWS_VISIBLE void
lws_plat_change_pollfd(struct lws_context *context,
struct lws *wsi, struct lws_pollfd *pfd)
{
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
long networkevents = LWS_POLLHUP;
if ((pfd->events & LWS_POLLIN))
networkevents |= LWS_POLLOUT;
if (WSAEventSelect(wsi->sock,
- context->events[wsi->position_in_fds_table + 1],
+ pt->events[wsi->position_in_fds_table + 1],
networkevents) != SOCKET_ERROR)
return 0;
lws_plat_init(struct lws_context *context,
struct lws_context_creation_info *info)
{
- int i;
+ struct lws_context_per_thread *pt = &context->pt[0];
+ int i, n = context->count_threads;
for (i = 0; i < FD_HASHTABLE_MODULUS; i++) {
context->fd_hashtable[i].wsi =
return -1;
}
- context->events = lws_malloc(sizeof(WSAEVENT) * (context->max_fds + 1));
- if (context->events == NULL) {
- lwsl_err("Unable to allocate events array for %d connections\n",
- context->max_fds);
- return 1;
- }
+ while (n--) {
+ pt->events = lws_malloc(sizeof(WSAEVENT) *
+ (context->fd_limit_per_thread + 1));
+ if (pt->events == NULL) {
+ lwsl_err("Unable to allocate events array for %d connections\n",
+ context->fd_limit_per_thread + 1);
+ return 1;
+ }
- context->fds_count = 0;
- context->events[0] = WSACreateEvent();
+ pt->fds_count = 0;
+ pt->events[0] = WSACreateEvent();
+
+ pt++;
+ }
context->fd_random = 0;
LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf, size_t len,
enum lws_write_protocol wp)
{
+ struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
int masked7 = (wsi->mode == LWSCM_WS_CLIENT);
unsigned char is_masked_bit = 0;
unsigned char *dropmask = NULL;
if (wsi->state == LWSS_ESTABLISHED && wsi->u.ws.tx_draining_ext) {
/* remove us from the list */
- struct lws **w = &wsi->context->tx_draining_ext_list;
+ struct lws **w = &pt->tx_draining_ext_list;
lwsl_debug("%s: TX EXT DRAINING: Remove from list\n", __func__);
wsi->u.ws.tx_draining_ext = 0;
/* remove us from context draining ext list */
if (n && eff_buf.token_len) {
/* extension requires further draining */
wsi->u.ws.tx_draining_ext = 1;
- wsi->u.ws.tx_draining_ext_list =
- wsi->context->tx_draining_ext_list;
- wsi->context->tx_draining_ext_list = wsi;
+ wsi->u.ws.tx_draining_ext_list = pt->tx_draining_ext_list;
+ pt->tx_draining_ext_list = wsi;
/* we must come back to do more */
lws_callback_on_writable(wsi);
/*
LWS_VISIBLE int lws_serve_http_file_fragment(struct lws *wsi)
{
struct lws_context *context = wsi->context;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
unsigned long amount;
int n, m;
goto all_sent;
if (lws_plat_file_read(wsi, wsi->u.http.fd, &amount,
- context->serv_buf,
- sizeof(context->serv_buf)) < 0)
+ pt->serv_buf,
+ LWS_MAX_SOCKET_IO_BUF) < 0)
return -1; /* caller will close */
n = (int)amount;
lws_set_timeout(wsi, PENDING_TIMEOUT_HTTP_CONTENT,
AWAITING_TIMEOUT);
wsi->u.http.filepos += n;
- m = lws_write(wsi, context->serv_buf, n,
+ m = lws_write(wsi, pt->serv_buf, n,
wsi->u.http.filepos == wsi->u.http.filelen ?
LWS_WRITE_HTTP_FINAL : LWS_WRITE_HTTP);
if (m < 0)
int
lws_rx_sm(struct lws *wsi, unsigned char c)
{
+ struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
struct lws_tokens eff_buf;
int ret = 0, n, rx_draining_ext = 0;
int callback_action = LWS_CALLBACK_RECEIVE;
switch (wsi->lws_rx_parse_state) {
case LWS_RXPS_NEW:
if (wsi->u.ws.rx_draining_ext) {
- struct lws **w = &wsi->context->rx_draining_ext_list;
+ struct lws **w = &pt->rx_draining_ext_list;
eff_buf.token = NULL;
eff_buf.token_len = 0;
assert(wsi->u.ws.rx_ubuf);
- if (wsi->u.ws.rx_ubuf_head + LWS_PRE + 4 >= wsi->u.ws.rx_ubuf_alloc) {
- lwsl_err("Attempted overflow\n");
- return -1;
- }
+ if (wsi->u.ws.rx_ubuf_head + LWS_PRE >=
+ wsi->u.ws.rx_ubuf_alloc) {
+ lwsl_err("Attempted overflow \n");
+ return -1;
+ }
if (wsi->u.ws.all_zero_nonce)
wsi->u.ws.rx_ubuf[LWS_PRE +
- (wsi->u.ws.rx_ubuf_head++)] = c;
+ (wsi->u.ws.rx_ubuf_head++)] = c;
else
wsi->u.ws.rx_ubuf[LWS_PRE +
(wsi->u.ws.rx_ubuf_head++)] =
wsi->protocol->callback, wsi,
LWS_CALLBACK_WS_PEER_INITIATED_CLOSE,
wsi->user_space,
- &wsi->u.ws.rx_ubuf[
- LWS_PRE],
+ &wsi->u.ws.rx_ubuf[LWS_PRE],
wsi->u.ws.rx_ubuf_head))
return -1;
default:
lwsl_parser("passing opc %x up to exts\n",
- wsi->u.ws.opcode);
+ wsi->u.ws.opcode);
/*
* It's something special we can't understand here.
* Pass the payload up to the extension's parsing
if (n && eff_buf.token_len) {
/* extension had more... main loop will come back */
wsi->u.ws.rx_draining_ext = 1;
- wsi->u.ws.rx_draining_ext_list = wsi->context->rx_draining_ext_list;
- wsi->context->rx_draining_ext_list = wsi;
+ wsi->u.ws.rx_draining_ext_list = pt->rx_draining_ext_list;
+ pt->rx_draining_ext_list = wsi;
}
if (eff_buf.token_len > 0 ||
insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
{
struct lws_pollargs pa = { wsi->sock, LWS_POLLIN, 0 };
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
- if (context->fds_count >= context->max_fds) {
+ if ((unsigned int)pt->fds_count >= context->fd_limit_per_thread) {
lwsl_err("Too many fds (%d)\n", context->max_fds);
return 1;
}
return -1;
insert_wsi(context, wsi);
- wsi->position_in_fds_table = context->fds_count;
- context->fds[context->fds_count].fd = wsi->sock;
- context->fds[context->fds_count].events = LWS_POLLIN;
+ wsi->position_in_fds_table = pt->fds_count;
+ pt->fds[pt->fds_count].fd = wsi->sock;
+ pt->fds[pt->fds_count].events = LWS_POLLIN;
lws_plat_insert_socket_into_fds(context, wsi);
struct lws *end_wsi;
struct lws_pollargs pa = { wsi->sock, 0, 0 };
struct lws_context *context = wsi->context;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE);
- --context->fds_count;
+ --pt->fds_count;
#if !defined(_WIN32) && !defined(MBED_OPERATORS)
if (wsi->sock > context->max_fds) {
m = wsi->position_in_fds_table; /* replace the contents for this */
/* have the last guy take up the vacant slot */
- context->fds[m] = context->fds[context->fds_count];
+ pt->fds[m] = pt->fds[pt->fds_count];
lws_plat_delete_socket_from_fds(context, wsi, m);
* (still same fd pointing to same wsi)
*/
/* end guy's "position in fds table" changed */
- end_wsi = wsi_from_fd(context, context->fds[context->fds_count].fd);
+ end_wsi = wsi_from_fd(context, pt->fds[pt->fds_count].fd);
end_wsi->position_in_fds_table = m;
/* deletion guy's lws_lookup entry needs nuking */
delete_from_fd(context, wsi->sock);
lws_change_pollfd(struct lws *wsi, int _and, int _or)
{
struct lws_context *context;
+ struct lws_context_per_thread *pt;
int tid;
int sampled_tid;
struct lws_pollfd *pfd;
if (!context)
return 1;
- pfd = &context->fds[wsi->position_in_fds_table];
+ pt = &context->pt[(int)wsi->tsi];
+
+ pfd = &pt->fds[wsi->position_in_fds_table];
pa.fd = wsi->sock;
if (context->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
if (tid == -1)
return -1;
if (tid != sampled_tid)
- lws_cancel_service(context);
+ lws_cancel_service_pt(wsi);
}
}
network_sock:
#endif
- if (lws_ext_cb_active(wsi,
- LWS_EXT_CB_REQUEST_ON_WRITEABLE, NULL, 0))
+ if (lws_ext_cb_active(wsi, LWS_EXT_CB_REQUEST_ON_WRITEABLE, NULL, 0))
return 1;
if (wsi->position_in_fds_table < 0) {
lws_callback_on_writable_all_protocol(const struct lws_context *context,
const struct lws_protocols *protocol)
{
+ const struct lws_context_per_thread *pt = &context->pt[0];
struct lws *wsi;
- int n;
-
- for (n = 0; n < context->fds_count; n++) {
- wsi = wsi_from_fd(context,context->fds[n].fd);
- if (!wsi)
- continue;
- if (wsi->protocol == protocol)
- lws_callback_on_writable(wsi);
+ int n, m = context->count_threads;
+
+ while (m--) {
+ for (n = 0; n < pt->fds_count; n++) {
+ wsi = wsi_from_fd(context, pt->fds[n].fd);
+ if (!wsi)
+ continue;
+ if (wsi->protocol == protocol)
+ lws_callback_on_writable(wsi);
+ }
+ pt++;
}
return 0;
#ifndef SYSTEM_RANDOM_FILEPATH
#define SYSTEM_RANDOM_FILEPATH "/dev/urandom"
#endif
-#ifndef LWS_MAX_ZLIB_CONN_BUFFER
-#define LWS_MAX_ZLIB_CONN_BUFFER (64 * 1024)
-#endif
/*
* if not in a connection storm, check for incoming
unsigned char nfrag;
};
-struct lws_context {
- time_t last_timeout_check_s;
- struct lws_plat_file_ops fops;
+/*
+ * so we can have n connections being serviced simultaneously,
+ * these things need to be isolated per-thread.
+ */
+
+struct lws_context_per_thread {
+ struct lws_pollfd *fds;
+ struct lws *rx_draining_ext_list;
+ struct lws *tx_draining_ext_list;
+#ifdef LWS_OPENSSL_SUPPORT
+ struct lws *pending_read_list; /* linked list */
+#endif
+ /*
+ * usable by anything in the service code, but only if the scope
+ * does not last longer than the service action (since next service
+ * of any socket can likewise use it and overwrite)
+ */
+ unsigned char *serv_buf;
#ifdef _WIN32
WSAEVENT *events;
+#else
+ int dummy_pipe_fds[2];
#endif
- struct lws_pollfd *fds;
+ int fds_count;
+};
+
+/*
+ * the rest is managed per-context, that includes
+ *
+ * - processwide single fd -> wsi lookup
+ * - contextwide headers pool
+ * - contextwide ssl context
+ * - contextwide proxy
+ */
+
+struct lws_context {
+ time_t last_timeout_check_s;
+ struct lws_plat_file_ops fops;
+ struct lws_context_per_thread pt[LWS_MAX_SMP];
#ifdef _WIN32
/* different implementation between unix and windows */
struct lws_fd_hashtable fd_hashtable[FD_HASHTABLE_MODULUS];
const struct lws_protocols *protocols;
void *http_header_data;
struct allocated_headers *ah_pool;
- struct lws *rx_draining_ext_list;
- struct lws *tx_draining_ext_list;
+
#ifdef LWS_OPENSSL_SUPPORT
SSL_CTX *ssl_ctx;
SSL_CTX *ssl_client_ctx;
- struct lws *pending_read_list; /* linked list */
#endif
#ifndef LWS_NO_EXTENSIONS
const struct lws_extension *extensions;
#endif
- /*
- * usable by anything in the service code, but only if the scope
- * does not last longer than the service action (since next service
- * of any socket can likewise use it and overwrite)
- */
- unsigned char serv_buf[LWS_MAX_SOCKET_IO_BUF];
char http_proxy_address[128];
char proxy_basic_auth_token[128];
char canonical_hostname[128];
lws_sockfd_type lserv_fd;
- int fds_count;
int max_fds;
int listen_port;
#ifdef LWS_USE_LIBEV
int lserv_seen;
unsigned int http_proxy_port;
unsigned int options;
+ unsigned int fd_limit_per_thread;
/*
* set to the Thread ID that's doing the service loop just before entry
*/
volatile int service_tid;
int service_tid_detected;
-#ifndef _WIN32
- int dummy_pipe_fds[2];
-#endif
int count_protocols;
int ka_time;
int use_ssl;
int allow_non_ssl_on_ssl_port;
unsigned int user_supplied_ssl_ctx:1;
-#define lws_ssl_anybody_has_buffered_read(ctx) \
- (ctx->use_ssl && ctx->pending_read_list)
+#define lws_ssl_anybody_has_buffered_read(w) \
+ (w->context->use_ssl && \
+ w->context->pt[(int)w->tsi].pending_read_list)
+#define lws_ssl_anybody_has_buffered_read_tsi(c, t) \
+ (c->use_ssl && \
+ c->pt[(int)t].pending_read_list)
#else
#define lws_ssl_anybody_has_buffered_read(ctx) (0)
+#define lws_ssl_anybody_has_buffered_read_tsi(ctx, t) (0)
#endif
short max_http_header_data;
short max_http_header_pool;
short ah_count_in_use;
+ short count_threads;
unsigned int being_destroyed:1;
};
char rx_frame_type; /* enum lws_write_protocol */
char pending_timeout; /* enum pending_timeout */
char pps; /* enum lws_pending_protocol_send */
+ char tsi; /* thread service index we belong to */
};
LWS_EXTERN int log_level;
lws_poll_listen_fd(struct lws_pollfd *fd);
LWS_EXTERN int
lws_plat_service(struct lws_context *context, int timeout_ms);
+LWS_EXTERN LWS_VISIBLE int
+lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi);
LWS_EXTERN int
lws_plat_init(struct lws_context *context,
struct lws_context_creation_info *info);
lws_extension_server_handshake(struct lws *wsi, char **p)
{
struct lws_context *context = wsi->context;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
const struct lws_extension *ext;
char ext_name[128];
int ext_count = 0;
* and go through them
*/
- if (lws_hdr_copy(wsi, (char *)context->serv_buf,
- sizeof(context->serv_buf), WSI_TOKEN_EXTENSIONS) < 0)
+ if (lws_hdr_copy(wsi, (char *)pt->serv_buf, LWS_MAX_SOCKET_IO_BUF,
+ WSI_TOKEN_EXTENSIONS) < 0)
return 1;
- c = (char *)context->serv_buf;
+ c = (char *)pt->serv_buf;
lwsl_parser("WSI_TOKEN_EXTENSIONS = '%s'\n", c);
wsi->count_act_ext = 0;
n = 0;
int
handshake_0405(struct lws_context *context, struct lws *wsi)
{
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
unsigned char hash[20];
int n;
char *response;
* since key length is restricted above (currently 128), cannot
* overflow
*/
- n = sprintf((char *)context->serv_buf,
+ n = sprintf((char *)pt->serv_buf,
"%s258EAFA5-E914-47DA-95CA-C5AB0DC85B11",
lws_hdr_simple_ptr(wsi, WSI_TOKEN_KEY));
- lws_SHA1(context->serv_buf, n, hash);
+ lws_SHA1(pt->serv_buf, n, hash);
accept_len = lws_b64_encode_string((char *)hash, 20,
- (char *)context->serv_buf,
- sizeof(context->serv_buf));
+ (char *)pt->serv_buf, LWS_MAX_SOCKET_IO_BUF);
if (accept_len < 0) {
lwsl_warn("Base64 encoded hash too long\n");
goto bail;
/* make a buffer big enough for everything */
- response = (char *)context->serv_buf + MAX_WEBSOCKET_04_KEY_LEN + LWS_PRE;
+ response = (char *)pt->serv_buf + MAX_WEBSOCKET_04_KEY_LEN + LWS_PRE;
p = response;
LWS_CPYAPP(p, "HTTP/1.1 101 Switching Protocols\x0d\x0a"
"Upgrade: WebSocket\x0d\x0a"
"Connection: Upgrade\x0d\x0a"
"Sec-WebSocket-Accept: ");
- strcpy(p, (char *)context->serv_buf);
+ strcpy(p, (char *)pt->serv_buf);
p += accept_len;
if (lws_hdr_total_length(wsi, WSI_TOKEN_PROTOCOL)) {
return 0;
}
+static int
+lws_get_idlest_tsi(struct lws_context *context)
+{
+ unsigned int lowest = ~0;
+ int n, hit = 0;
+
+ for (n = 0; n < context->count_threads; n++)
+ if ((unsigned int)context->pt[n].fds_count < lowest) {
+ lowest = context->pt[n].fds_count;
+ hit = n;
+ }
+
+ return hit;
+}
+
LWS_VISIBLE
int lws_server_socket_service(struct lws_context *context,
struct lws *wsi, struct lws_pollfd *pollfd)
{
lws_sockfd_type accept_fd = LWS_SOCK_INVALID;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
#if LWS_POSIX
struct sockaddr_in cli_addr;
socklen_t clilen;
if (!(pollfd->revents & pollfd->events && LWS_POLLIN))
goto try_pollout;
- len = lws_ssl_capable_read(wsi, context->serv_buf,
- sizeof(context->serv_buf));
+ len = lws_ssl_capable_read(wsi, pt->serv_buf,
+ LWS_MAX_SOCKET_IO_BUF);
lwsl_debug("%s: read %d\r\n", __func__, len);
switch (len) {
case 0:
* hm this may want to send
* (via HTTP callback for example)
*/
- n = lws_read(wsi, context->serv_buf, len);
+ n = lws_read(wsi, pt->serv_buf, len);
if (n < 0) /* we closed wsi */
return 1;
}
new_wsi->sock = accept_fd;
+ new_wsi->tsi = lws_get_idlest_tsi(context);
+ lwsl_info("Accepted to tsi %d\n", new_wsi->tsi);
/* the transport is accepted... give him time to negotiate */
lws_set_timeout(new_wsi, PENDING_TIMEOUT_ESTABLISH_WITH_SERVER,
int other_headers_len)
{
struct lws_context *context = lws_get_context(wsi);
- unsigned char *response = context->serv_buf + LWS_PRE;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
+ unsigned char *response = pt->serv_buf + LWS_PRE;
unsigned char *p = response;
- unsigned char *end = p + sizeof(context->serv_buf) - LWS_PRE;
+ unsigned char *end = p + LWS_MAX_SOCKET_IO_BUF - LWS_PRE;
int ret = 0;
wsi->u.http.fd = lws_plat_file_open(wsi, file, &wsi->u.http.filelen,
*/
LWS_VISIBLE int
-lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd)
+lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int tsi)
{
#if LWS_POSIX
int idx = 0;
#endif
+ struct lws_context_per_thread *pt = &context->pt[tsi];
lws_sockfd_type our_fd = 0;
struct lws_tokens eff_buf;
unsigned int pending = 0;
* pending connection here, it causes us to check again next time.
*/
- if (context->lserv_fd && pollfd != &context->fds[idx]) {
+ if (context->lserv_fd && pollfd != &pt->fds[idx]) {
context->lserv_count++;
if (context->lserv_seen ||
context->lserv_count == context->lserv_mod) {
* even with extpoll, we prepared this
* internal fds for listen
*/
- n = lws_poll_listen_fd(&context->fds[idx]);
+ n = lws_poll_listen_fd(&pt->fds[idx]);
if (n <= 0) {
if (context->lserv_seen)
context->lserv_seen--;
break;
}
/* there's a conn waiting for us */
- lws_service_fd(context, &context->fds[idx]);
+ lws_service_fd(context, &pt->fds[idx]);
context->lserv_seen++;
}
}
if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
break;
read:
- eff_buf.token_len = lws_ssl_capable_read(wsi, context->serv_buf,
- pending ? pending :
- sizeof(context->serv_buf));
+ eff_buf.token_len = lws_ssl_capable_read(wsi, pt->serv_buf,
+ pending ? pending : LWS_MAX_SOCKET_IO_BUF);
switch (eff_buf.token_len) {
case 0:
lwsl_info("service_fd: closing due to 0 length read\n");
* used then so it is efficient.
*/
- eff_buf.token = (char *)context->serv_buf;
+ eff_buf.token = (char *)pt->serv_buf;
drain:
do {
pending = lws_ssl_pending(wsi);
if (pending) {
handle_pending:
- pending = pending > sizeof(context->serv_buf) ?
- sizeof(context->serv_buf) : pending;
+ pending = pending > LWS_MAX_SOCKET_IO_BUF ?
+ LWS_MAX_SOCKET_IO_BUF : pending;
goto read;
}
return n;
}
+LWS_VISIBLE int
+lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd)
+{
+ return lws_service_fd_tsi(context, pollfd, 0);
+}
+
/**
* lws_service() - Service any pending websocket activity
* @context: Websocket context
return lws_plat_service(context, timeout_ms);
}
+LWS_VISIBLE int
+lws_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
+{
+ return lws_plat_service_tsi(context, timeout_ms, tsi);
+}
+
error = ERR_get_error();
lwsl_err("problem creating ssl method %lu: %s\n",
error, ERR_error_string(error,
- (char *)context->serv_buf));
+ (char *)context->pt[0].serv_buf));
return 1;
}
context->ssl_ctx = SSL_CTX_new(method); /* create context */
error = ERR_get_error();
lwsl_err("problem creating ssl context %lu: %s\n",
error, ERR_error_string(error,
- (char *)context->serv_buf));
+ (char *)context->pt[0].serv_buf));
return 1;
}
info->ssl_cert_filepath,
error,
ERR_error_string(error,
- (char *)context->serv_buf));
+ (char *)context->pt[0].serv_buf));
return 1;
}
lws_ssl_bind_passphrase(context->ssl_ctx, info);
lwsl_err("ssl problem getting key '%s' %lu: %s\n",
info->ssl_private_key_filepath, error,
ERR_error_string(error,
- (char *)context->serv_buf));
+ (char *)context->pt[0].serv_buf));
return 1;
}
} else
}
#ifdef LWS_SSL_SERVER_WITH_ECDH_CERT
- if (context->options & LWS_SERVER_OPTION_SSL_ECDH) {
+ if (context->options & LWS_SERVER_OPTION_SSL_ECDH) {
lwsl_notice(" Using ECDH certificate support\n");
/* Get X509 certificate from ssl context */
error = ERR_get_error();
lwsl_err("problem creating ssl method %lu: %s\n",
error, ERR_error_string(error,
- (char *)context->serv_buf));
+ (char *)context->pt[0].serv_buf));
return 1;
}
/* create context */
error = ERR_get_error();
lwsl_err("problem creating ssl context %lu: %s\n",
error, ERR_error_string(error,
- (char *)context->serv_buf));
+ (char *)context->pt[0].serv_buf));
return 1;
}
info->ssl_cert_filepath,
ERR_get_error(),
ERR_error_string(ERR_get_error(),
- (char *)context->serv_buf));
+ (char *)context->pt[0].serv_buf));
return 1;
}
}
info->ssl_private_key_filepath,
ERR_get_error(),
ERR_error_string(ERR_get_error(),
- (char *)context->serv_buf));
+ (char *)context->pt[0].serv_buf));
return 1;
}
lws_ssl_remove_wsi_from_buffered_list(struct lws *wsi)
{
struct lws_context *context = wsi->context;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
if (!wsi->pending_read_list_prev &&
!wsi->pending_read_list_next &&
- context->pending_read_list != wsi)
+ pt->pending_read_list != wsi)
/* we are not on the list */
return;
/* point previous guy's next to our next */
if (!wsi->pending_read_list_prev)
- context->pending_read_list = wsi->pending_read_list_next;
+ pt->pending_read_list = wsi->pending_read_list_next;
else
wsi->pending_read_list_prev->pending_read_list_next =
wsi->pending_read_list_next;
lws_ssl_capable_read(struct lws *wsi, unsigned char *buf, int len)
{
struct lws_context *context = wsi->context;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
int n;
if (!wsi->ssl)
return n;
if (wsi->pending_read_list_prev)
return n;
- if (context->pending_read_list == wsi)
+ if (pt->pending_read_list == wsi)
return n;
/* add us to the linked list of guys with pending ssl */
- if (context->pending_read_list)
- context->pending_read_list->pending_read_list_prev = wsi;
+ if (pt->pending_read_list)
+ pt->pending_read_list->pending_read_list_prev = wsi;
- wsi->pending_read_list_next = context->pending_read_list;
+ wsi->pending_read_list_next = pt->pending_read_list;
wsi->pending_read_list_prev = NULL;
- context->pending_read_list = wsi;
+ pt->pending_read_list = wsi;
return n;
bail:
{
struct lws *wsi = *pwsi;
struct lws_context *context = wsi->context;
+ struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
int n, m;
#ifndef USE_WOLFSSL
BIO *bio;
lws_latency_pre(context, wsi);
- n = recv(wsi->sock, (char *)context->serv_buf,
- sizeof(context->serv_buf), MSG_PEEK);
+ n = recv(wsi->sock, (char *)pt->serv_buf, LWS_MAX_SOCKET_IO_BUF,
+ MSG_PEEK);
/*
* optionally allow non-SSL connect on SSL listening socket
*/
if (context->allow_non_ssl_on_ssl_port) {
- if (n >= 1 && context->serv_buf[0] >= ' ') {
+ if (n >= 1 && pt->serv_buf[0] >= ' ') {
/*
* TLS content-type for Handshake is 0x16, and
* for ChangeCipherSpec Record, it's 0x14
/* SSL server using ECDH certificate */
#cmakedefine LWS_SSL_SERVER_WITH_ECDH_CERT
+/* Maximum supported service threads */
+#define LWS_MAX_SMP ${LWS_MAX_SMP}
+
${LWS_SIZEOFPTR_CODE}
/*
* libwebsockets-test-server - libwebsockets test implementation
*
- * Copyright (C) 2010-2015 Andy Green <andy@warmcat.com>
+ * Copyright (C) 2010-2016 Andy Green <andy@warmcat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
pthread_exit(NULL);
}
+void *thread_service(void *threadid)
+{
+ while (lws_service_tsi(context, 50, (int)(long)threadid) >= 0 && !force_exit)
+ ;
+
+ pthread_exit(NULL);
+}
+
void sighandler(int sig)
{
force_exit = 1;
lws_cancel_service(context);
}
+static const struct lws_extension exts[] = {
+ {
+ "permessage-deflate",
+ lws_extension_callback_pm_deflate,
+ "permessage-deflate"
+ },
+ {
+ "deflate-frame",
+ lws_extension_callback_pm_deflate,
+ "deflate_frame"
+ },
+ { NULL, NULL, NULL /* terminator */ }
+};
+
static struct option options[] = {
{ "help", no_argument, NULL, 'h' },
{ "debug", required_argument, NULL, 'd' },
{ "interface", required_argument, NULL, 'i' },
{ "closetest", no_argument, NULL, 'c' },
{ "libev", no_argument, NULL, 'e' },
+ { "threads", required_argument, NULL, 'j' },
#ifndef LWS_NO_DAEMONIZE
{ "daemonize", no_argument, NULL, 'D' },
#endif
struct lws_context_creation_info info;
char interface_name[128] = "";
const char *iface = NULL;
- pthread_t pthread_dumb;
+ pthread_t pthread_dumb, pthread_service[32];
char cert_path[1024];
char key_path[1024];
int debug_level = 7;
+ int threads = 1;
int use_ssl = 0;
void *retval;
int opts = 0;
pthread_mutex_init(&lock_established_conns, NULL);
while (n >= 0) {
- n = getopt_long(argc, argv, "eci:hsap:d:Dr:", options, NULL);
+ n = getopt_long(argc, argv, "eci:hsap:d:Dr:j:", options, NULL);
if (n < 0)
continue;
switch (n) {
+ case 'j':
+ threads = atoi(optarg);
+ if (threads > ARRAY_SIZE(pthread_service)) {
+ lwsl_err("Max threads %d\n", ARRAY_SIZE(pthread_service));
+ return 1;
+ }
+ break;
case 'e':
opts |= LWS_SERVER_OPTION_LIBEV;
break;
lws_set_log_level(debug_level, lwsl_emit_syslog);
lwsl_notice("libwebsockets test server - "
- "(C) Copyright 2010-2015 Andy Green <andy@warmcat.com> - "
+ "(C) Copyright 2010-2016 Andy Green <andy@warmcat.com> - "
"licensed under LGPL2.1\n");
printf("Using resource path \"%s\"\n", resource_path);
info.gid = -1;
info.uid = -1;
info.options = opts;
+ info.count_threads = threads;
+ info.extensions = exts;
context = lws_create_context(&info);
if (context == NULL) {
goto done;
}
- /* this is our service thread */
+ /*
+ * notice the actual number of threads may be capped by the library,
+ * so use lws_get_count_threads() to get the actual amount of threads
+ * initialized.
+ */
+
+ for (n = 0; n < lws_get_count_threads(context); n++)
+ if (pthread_create(&pthread_service[n], NULL, thread_service,
+ (void *)(long)n))
+ lwsl_err("Failed to start service thread\n");
- n = 0;
- while (n >= 0 && !force_exit) {
- n = lws_service(context, 50);
- }
+ /* wait for all the service threads to exit */
+
+ for (n = 0; n < lws_get_count_threads(context); n++)
+ pthread_join(pthread_service[n], &retval);
/* wait for pthread_dumb to exit */
pthread_join(pthread_dumb, &retval);