decouple-service-from-fd-array-index.patch
authorAndy Green <andy@warmcat.com>
Sat, 12 Feb 2011 11:57:43 +0000 (11:57 +0000)
committerAndy Green <andy.green@linaro.org>
Sat, 12 Feb 2011 11:57:43 +0000 (11:57 +0000)
This patch removes the relationship between position in the
pollfd[] array and any meaning about the type of socket.

It also refactors the service loop so there is a per-fd
function that detects the mode of the connection and services
it accordingly.

The context wsi * array is removed and a hashtable introduced
allowing fast wsi lookup from just the fd that it is
associated with

Signed-off-by: Andy Green <andy@warmcat.com>
lib/client-handshake.c
lib/libwebsockets.c
lib/parsers.c
lib/private-libwebsockets.h
libwebsockets-api-doc.html

index 70b6c81..e3a6efc 100644 (file)
@@ -50,11 +50,10 @@ libwebsocket_client_close(struct libwebsocket *wsi)
                clients = wsi->protocol->owning_server;
                if (clients)
                        for (n = 0; n < clients->fds_count; n++) {
-                               if (clients->wsi[n] != wsi)
+                               if (clients->fds[n].fd != wsi->sock)
                                        continue;
                                while (n < clients->fds_count - 1) {
                                        clients->fds[n] = clients->fds[n + 1];
-                                       clients->wsi[n] = clients->wsi[n + 1];
                                        n++;
                                }
                                /* we only have to deal with one */
@@ -150,8 +149,6 @@ libwebsocket_client_connect(struct libwebsocket_context *this,
                return NULL;
        }
 
-       this->wsi[this->fds_count] = wsi;
-
        /* -1 means just use latest supported */
 
        if (ietf_version_or_minus_one == -1)
@@ -225,6 +222,7 @@ libwebsocket_client_connect(struct libwebsocket_context *this,
                goto bail1;
        }
 
+       insert_wsi(this, wsi);
 
        server_addr.sin_family = AF_INET;
        server_addr.sin_port = htons(port);
index d4bfe75..586d603 100644 (file)
 
 #include "private-libwebsockets.h"
 
+/* file descriptor hash management */
+
+struct libwebsocket *
+wsi_from_fd(struct libwebsocket_context *this, int fd)
+{
+       int h = LWS_FD_HASH(fd);
+       int n = 0;
+
+       for (n = 0; n < this->fd_hashtable[h].length; n++)
+               if (this->fd_hashtable[h].wsi[n]->sock == fd)
+                       return this->fd_hashtable[h].wsi[n];
+
+       return NULL;
+}
+
+int
+insert_wsi(struct libwebsocket_context *this, struct libwebsocket *wsi)
+{
+       int h = LWS_FD_HASH(wsi->sock);
+
+       if (this->fd_hashtable[h].length == MAX_CLIENTS - 1) {
+               fprintf(stderr, "hash table overflow\n");
+               return 1;
+       }
+
+       this->fd_hashtable[h].wsi[this->fd_hashtable[h].length++] = wsi;
+
+       return 0;
+}
+
+int
+delete_from_fd(struct libwebsocket_context *this, int fd)
+{
+       int h = LWS_FD_HASH(fd);
+       int n = 0;
+
+       for (n = 0; n < this->fd_hashtable[h].length; n++)
+               if (this->fd_hashtable[h].wsi[n]->sock == fd) {
+                       while (n < this->fd_hashtable[h].length) {
+                               this->fd_hashtable[h].wsi[n] =
+                                              this->fd_hashtable[h].wsi[n + 1];
+                               n++;
+                       }
+                       this->fd_hashtable[h].length--;
+
+                       return 0;
+               }
+
+       fprintf(stderr, "Failed to find fd %d requested for "
+                                                  "delete in hashtable\n", fd);
+       return 1;
+}
+
+
 void
 libwebsocket_close_and_free_session(struct libwebsocket *wsi)
 {
@@ -80,73 +134,239 @@ libwebsocket_close_and_free_session(struct libwebsocket *wsi)
 }
 
 static int
-libwebsocket_poll_connections(struct libwebsocket_context *this)
+libwebsocket_service_fd(struct libwebsocket_context *this,
+                                                         struct pollfd *pollfd)
 {
        unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + MAX_BROADCAST_PAYLOAD +
                                                  LWS_SEND_BUFFER_POST_PADDING];
-       int client = this->count_protocols + 1;
-       struct libwebsocket *wsi;
+       struct libwebsocket *wsi = wsi_from_fd(this, pollfd->fd);
+       struct libwebsocket *new_wsi;
        int n;
+       int m;
        size_t len;
+       int accept_fd;
+       unsigned int clilen;
+       struct sockaddr_in cli_addr;
+
+       if (wsi == NULL)
+               return 1;
+
+       switch (wsi->mode) {
+       case LWS_CONNMODE_SERVER_LISTENER:
+
+               /* pollin means a client has connected to us then */
+
+               if (!pollfd->revents & POLLIN)
+                       break;
+
+               /* listen socket got an unencrypted connection... */
+
+               clilen = sizeof(cli_addr);
+               accept_fd  = accept(pollfd->fd, (struct sockaddr *)&cli_addr,
+                                                                      &clilen);
+               if (accept_fd < 0) {
+                       fprintf(stderr, "ERROR on accept");
+                       break;
+               }
+
+               if (this->fds_count >= MAX_CLIENTS) {
+                       fprintf(stderr, "too busy");
+                       close(accept_fd);
+                       break;
+               }
+
+               /* accepting connection to main listener */
+
+               new_wsi = malloc(sizeof(struct libwebsocket));
+               if (new_wsi == NULL) {
+                       fprintf(stderr, "Out of memory for new connection\n");
+                       break;
+               }
+
+               memset(new_wsi, 0, sizeof (struct libwebsocket));
+               new_wsi->sock = accept_fd;
+
+#ifdef LWS_OPENSSL_SUPPORT
+               new_wsi->ssl = NULL;
+               this->ssl_ctx = NULL;
+
+               if (this->use_ssl) {
+
+                       new_wsi->ssl = SSL_new(this->ssl_ctx);
+                       if (new_wsi->ssl == NULL) {
+                               fprintf(stderr, "SSL_new failed: %s\n",
+                                   ERR_error_string(SSL_get_error(
+                                   new_wsi->ssl, 0), NULL));
+                               free(new_wsi);
+                               break;
+                       }
+
+                       SSL_set_fd(new_wsi->ssl, accept_fd);
+
+                       n = SSL_accept(new_wsi->ssl);
+                       if (n != 1) {
+                               /*
+                                * browsers seem to probe with various
+                                * ssl params which fail then retry
+                                * and succeed
+                                */
+                               debug("SSL_accept failed skt %u: %s\n",
+                                     pollfd->fd,
+                                     ERR_error_string(SSL_get_error(
+                                     new_wsi->ssl, n), NULL));
+                               SSL_free(
+                                      new_wsi->ssl);
+                               free(new_wsi);
+                               break;
+                       }
+                       debug("accepted new SSL conn  "
+                             "port %u on fd=%d SSL ver %s\n",
+                               ntohs(cli_addr.sin_port), accept_fd,
+                                 SSL_get_version(new_wsi->ssl));
+
+               } else
+#endif
+                       debug("accepted new conn  port %u on fd=%d\n",
+                                         ntohs(cli_addr.sin_port), accept_fd);
+
+               /* intialize the instance struct */
+
+               new_wsi->state = WSI_STATE_HTTP;
+               new_wsi->name_buffer_pos = 0;
+               new_wsi->mode = LWS_CONNMODE_WS_SERVING;
+
+               for (n = 0; n < WSI_TOKEN_COUNT; n++) {
+                       new_wsi->utf8_token[n].token = NULL;
+                       new_wsi->utf8_token[n].token_len = 0;
+               }
+
+               /*
+                * these can only be set once the protocol is known
+                * we set an unestablished connection's protocol pointer
+                * to the start of the supported list, so it can look
+                * for matching ones during the handshake
+                */
+               new_wsi->protocol = this->protocols;
+               new_wsi->user_space = NULL;
+
+               /*
+                * Default protocol is 76 / 00
+                * After 76, there's a header specified to inform which
+                * draft the client wants, when that's seen we modify
+                * the individual connection's spec revision accordingly
+                */
+               new_wsi->ietf_spec_revision = 0;
+
+               insert_wsi(this, new_wsi);
+
+
+               /*
+                * make sure NO events are seen yet on this new socket
+                * (otherwise we inherit old fds[client].revents from
+                * previous socket there and die mysteriously! )
+                */
+               this->fds[this->fds_count].revents = 0;
+
+               this->fds[this->fds_count].events = POLLIN;
+               this->fds[this->fds_count++].fd = accept_fd;
+
+               break;
 
-       /* check for activity on client sockets */
+       case LWS_CONNMODE_BROADCAST_PROXY_LISTENER:
 
-       for (; client < this->fds_count; client++) {
+               /* as we are listening, POLLIN means accept() is needed */
+       
+               if (!pollfd->revents & POLLIN)
+                       break;
+
+               /* listen socket got an unencrypted connection... */
+
+               clilen = sizeof(cli_addr);
+               accept_fd  = accept(pollfd->fd, (struct sockaddr *)&cli_addr,
+                                                                      &clilen);
+               if (accept_fd < 0) {
+                       fprintf(stderr, "ERROR on accept");
+                       break;
+               }
+
+               if (this->fds_count >= MAX_CLIENTS) {
+                       fprintf(stderr, "too busy");
+                       close(accept_fd);
+                       break;
+               }
+
+               /* create a dummy wsi for the connection and add it */
+
+               new_wsi = malloc(sizeof(struct libwebsocket));
+               memset(new_wsi, 0, sizeof (struct libwebsocket));
+               new_wsi->sock = accept_fd;
+               new_wsi->mode = LWS_CONNMODE_BROADCAST_PROXY;
+               new_wsi->state = WSI_STATE_ESTABLISHED;
+               /* note which protocol we are proxying */
+               new_wsi->protocol_index_for_broadcast_proxy =
+                                       wsi->protocol_index_for_broadcast_proxy;
+               insert_wsi(this, new_wsi);
+
+               /* add connected socket to internal poll array */
+
+               this->fds[this->fds_count].revents = 0;
+               this->fds[this->fds_count].events = POLLIN;
+               this->fds[this->fds_count++].fd = accept_fd;
+
+               break;
+
+       case LWS_CONNMODE_BROADCAST_PROXY:
 
                /* handle session socket closed */
 
-               if (this->fds[client].revents & (POLLERR | POLLHUP)) {
+               if (pollfd->revents & (POLLERR | POLLHUP)) {
 
-                       debug("Session Socket %d %p (fd=%d) dead\n",
-                               client, (void *)this->wsi[client],
-                                                         this->fds[client].fd);
+                       debug("Session Socket %p (fd=%d) dead\n",
+                               (void *)wsi, accept_fd);
 
-                       libwebsocket_close_and_free_session(this->wsi[client]);
+                       libwebsocket_close_and_free_session(wsi);
                        goto nuke_this;
                }
 
                /* the guy requested a callback when it was OK to write */
 
-               if ((unsigned long)this->wsi[client] > LWS_MAX_PROTOCOLS &&
-                                         this->fds[client].revents & POLLOUT) {
+               if (pollfd->revents & POLLOUT) {
+
+                       /* one shot */
 
-                       this->fds[client].events &= ~POLLOUT;
+                       pollfd->events &= ~POLLOUT;
 
-                       this->wsi[client]->protocol->callback(this->wsi[client],
+                       wsi->protocol->callback(wsi,
                                LWS_CALLBACK_CLIENT_WRITEABLE,
-                               this->wsi[client]->user_space,
+                               wsi->user_space,
                                NULL, 0);
                }
 
                /* any incoming data ready? */
 
-               if (!(this->fds[client].revents & POLLIN))
-                       continue;
+               if (!(pollfd->revents & POLLIN))
+                       break;
 
-               /* broadcast? */
+               /* get the issued broadcast payload from the socket */
 
-               if ((unsigned long)this->wsi[client] < LWS_MAX_PROTOCOLS) {
+               len = read(pollfd->fd, buf + LWS_SEND_BUFFER_PRE_PADDING,
+                                                        MAX_BROADCAST_PAYLOAD);
+               if (len < 0) {
+                       fprintf(stderr, "Error reading broadcast payload\n");
+                       break;;
+               }
 
-                       /* get the issued broadcast payload from the socket */
+               /* broadcast it to all guys with this protocol index */
 
-                       len = read(this->fds[client].fd,
-                                  buf + LWS_SEND_BUFFER_PRE_PADDING,
-                                  MAX_BROADCAST_PAYLOAD);
+               for (n = 0; n < FD_HASHTABLE_MODULUS; n++) {
 
-                       if (len < 0) {
-                               fprintf(stderr,
-                                          "Error reading broadcast payload\n");
-                               continue;
-                       }
+                       for (m = 0; m < this->fd_hashtable[n].length; m++) {
 
-                       /* broadcast it to all guys with this protocol index */
+                               new_wsi = this->fd_hashtable[n].wsi[m];
 
-                       for (n = this->count_protocols + 1;
-                                                    n < this->fds_count; n++) {
+                               /* only to clients we are serving to */
 
-                               wsi = this->wsi[n];
-
-                               if ((unsigned long)wsi < LWS_MAX_PROTOCOLS)
+                               if (new_wsi->mode != LWS_CONNMODE_WS_SERVING)
                                        continue;
 
                                /*
@@ -154,12 +374,7 @@ libwebsocket_poll_connections(struct libwebsocket_context *this)
                                 * connection
                                 */
 
-                               if (wsi->state != WSI_STATE_ESTABLISHED)
-                                       continue;
-
-                               /* only to clients connected to us */
-
-                               if (wsi->mode != LWS_CONNMODE_WS_SERVING)
+                               if (new_wsi->state != WSI_STATE_ESTABLISHED)
                                        continue;
 
                                /*
@@ -167,42 +382,72 @@ libwebsocket_poll_connections(struct libwebsocket_context *this)
                                 * the requested protocol
                                 */
 
-                               if (wsi->protocol->protocol_index !=
-                                         (int)(unsigned long)this->wsi[client])
+                               if (new_wsi->protocol->protocol_index !=
+                                       wsi->protocol_index_for_broadcast_proxy)
                                        continue;
 
                                /* broadcast it to this connection */
 
-                               wsi->protocol->callback(wsi,
+                               new_wsi->protocol->callback(new_wsi,
                                        LWS_CALLBACK_BROADCAST,
-                                       wsi->user_space,
+                                       new_wsi->user_space,
                                        buf + LWS_SEND_BUFFER_PRE_PADDING, len);
                        }
+               }
+               break;
+
+       case LWS_CONNMODE_WS_SERVING:
+       case LWS_CONNMODE_WS_CLIENT:
+
+               /* handle session socket closed */
+
+               if (pollfd->revents & (POLLERR | POLLHUP)) {
+
+                       debug("Session Socket %p (fd=%d) dead\n",
+                               (void *)wsi, pollfd->fd);
+
+                       libwebsocket_close_and_free_session(wsi);
+                       goto nuke_this;
+               }
+
+               /* the guy requested a callback when it was OK to write */
+
+               if (pollfd->revents & POLLOUT) {
 
-                       continue;
+                       pollfd->events &= ~POLLOUT;
+
+                       wsi->protocol->callback(wsi,
+                               LWS_CALLBACK_CLIENT_WRITEABLE,
+                               wsi->user_space,
+                               NULL, 0);
                }
 
+               /* any incoming data ready? */
+
+               if (!(pollfd->revents & POLLIN))
+                       break;
+
 #ifdef LWS_OPENSSL_SUPPORT
-               if (this->wsi[client]->ssl)
-                       n = SSL_read(this->wsi[client]->ssl, buf, sizeof buf);
+               if (wsi->ssl)
+                       n = SSL_read(wsi->ssl, buf, sizeof buf);
                else
 #endif
-                       n = recv(this->fds[client].fd, buf, sizeof buf, 0);
+                       n = recv(pollfd->fd, buf, sizeof buf, 0);
 
                if (n < 0) {
                        fprintf(stderr, "Socket read returned %d\n", n);
-                       continue;
+                       break;;
                }
                if (!n) {
-                       libwebsocket_close_and_free_session(this->wsi[client]);
+                       libwebsocket_close_and_free_session(wsi);
                        goto nuke_this;
                }
 
                /* service incoming data */
 
-               n = libwebsocket_read(this->wsi[client], buf, n);
+               n = libwebsocket_read(wsi, buf, n);
                if (n >= 0)
-                       continue;
+                       break;;
                /*
                 * it closed and nuked wsi[client], so remove the
                 * socket handle and wsi from our service list
@@ -210,21 +455,27 @@ libwebsocket_poll_connections(struct libwebsocket_context *this)
 nuke_this:
 
                debug("nuking wsi %p, fsd_count = %d\n",
-                               (void *)this->wsi[client], this->fds_count - 1);
+                               (void *)wsi, this->fds_count - 1);
 
-               this->fds_count--;
-               for (n = client; n < this->fds_count; n++) {
-                       this->fds[n] = this->fds[n + 1];
-                       this->wsi[n] = this->wsi[n + 1];
-               }
+               delete_from_fd(this, pollfd->fd);
 
-               return 0;
+               this->fds_count--;
+               for (n = 0; n < this->fds_count; n++)
+                       if (this->fds[n].fd == pollfd->fd) {
+                               while (n < this->fds_count) {
+                                       this->fds[n] = this->fds[n + 1];
+                                       n++;
+                               }
+                               n = this->fds_count;
+                       }
 
+               break;
        }
 
        return 0;
 }
 
+
 /**
  * libwebsocket_context_destroy() - Destroy the websocket context
  * @this:      Websocket context
@@ -236,18 +487,28 @@ nuke_this:
 void
 libwebsocket_context_destroy(struct libwebsocket_context *this)
 {
-       int client;
+       int n;
+       int m;
+       struct libwebsocket *wsi;
 
-       /* close listening skt and per-protocol broadcast sockets */
-       for (client = this->count_protocols + 1; client < this->fds_count; client++)
-               switch (this->wsi[client]->mode) {
-               case LWS_CONNMODE_WS_SERVING:
-                       libwebsocket_close_and_free_session(this->wsi[client]);
-                       break;
-               case LWS_CONNMODE_WS_CLIENT:
-                       libwebsocket_client_close(this->wsi[client]);
-                       break;
+       for (n = 0; n < FD_HASHTABLE_MODULUS; n++) {
+
+               for (m = 0; m < this->fd_hashtable[n].length; m++) {
+
+                       wsi = this->fd_hashtable[n].wsi[m];
+
+                       switch (wsi->mode) {
+                       case LWS_CONNMODE_WS_SERVING:
+                               libwebsocket_close_and_free_session(wsi);
+                               break;
+                       case LWS_CONNMODE_WS_CLIENT:
+                               libwebsocket_client_close(wsi);
+                               break;
+                       default:
+                               break;
+                       }
                }
+       }
 
        close(this->fd_random);
 
@@ -301,182 +562,32 @@ int
 libwebsocket_service(struct libwebsocket_context *this, int timeout_ms)
 {
        int n;
-       int client;
-       unsigned int clilen;
-       struct sockaddr_in cli_addr;
-       int fd;
 
        /* stay dead once we are dead */
 
        if (this == NULL)
                return 1;
 
-       /* don't check listen socket if we are not listening */
-
-       if (this->listen_port)
-               n = poll(this->fds, this->fds_count, timeout_ms);
-       else
-               n = poll(&this->fds[1], this->fds_count - 1, timeout_ms);
+       /* wait for something to need service */
 
+       n = poll(this->fds, this->fds_count, timeout_ms);
 
        if (n < 0 || this->fds[0].revents & (POLLERR | POLLHUP)) {
                /*
                fprintf(stderr, "Listen Socket dead\n");
                */
-               goto fatal;
+               return 1;
        }
        if (n == 0) /* poll timeout */
                return 0;
 
        /* handle accept on listening socket? */
 
-       for (client = 0; client < this->count_protocols + 1; client++) {
-
-               if (!this->fds[client].revents & POLLIN)
-                       continue;
-
-               /* listen socket got an unencrypted connection... */
-
-               clilen = sizeof(cli_addr);
-               fd  = accept(this->fds[client].fd,
-                            (struct sockaddr *)&cli_addr, &clilen);
-               if (fd < 0) {
-                       fprintf(stderr, "ERROR on accept");
-                       continue;
-               }
-
-               if (this->fds_count >= MAX_CLIENTS) {
-                       fprintf(stderr, "too busy");
-                       close(fd);
-                       continue;
-               }
-
-               if (client) {
-                       /*
-                        * accepting a connection to broadcast socket
-                        * set wsi to be protocol index not pointer
-                        */
-
-                       this->wsi[this->fds_count] =
-                             (struct libwebsocket *)(long)(client - 1);
-
-                       goto fill_in_fds;
-               }
-
-               /* accepting connection to main listener */
-
-               this->wsi[this->fds_count] =
-                                   malloc(sizeof(struct libwebsocket));
-               if (!this->wsi[this->fds_count]) {
-                       fprintf(stderr, "Out of memory for new connection\n");
-                       continue;
-               }
-
-#ifdef LWS_OPENSSL_SUPPORT
-               this->wsi[this->fds_count]->ssl = NULL;
-               this->ssl_ctx = NULL;
-
-               if (this->use_ssl) {
-
-                       this->wsi[this->fds_count]->ssl =
-                                                        SSL_new(this->ssl_ctx);
-                       if (this->wsi[this->fds_count]->ssl == NULL) {
-                               fprintf(stderr, "SSL_new failed: %s\n",
-                                   ERR_error_string(SSL_get_error(
-                                   this->wsi[this->fds_count]->ssl, 0),
-                                                                NULL));
-                               free(this->wsi[this->fds_count]);
-                               continue;
-                       }
-
-                       SSL_set_fd(this->wsi[this->fds_count]->ssl, fd);
-
-                       n = SSL_accept(this->wsi[this->fds_count]->ssl);
-                       if (n != 1) {
-                               /*
-                                * browsers seem to probe with various
-                                * ssl params which fail then retry
-                                * and succeed
-                                */
-                               debug("SSL_accept failed skt %u: %s\n",
-                                     fd,
-                                     ERR_error_string(SSL_get_error(
-                                     this->wsi[this->fds_count]->ssl,
-                                                            n), NULL));
-                               SSL_free(
-                                      this->wsi[this->fds_count]->ssl);
-                               free(this->wsi[this->fds_count]);
-                               continue;
-                       }
-                       debug("accepted new SSL conn  "
-                             "port %u on fd=%d SSL ver %s\n",
-                               ntohs(cli_addr.sin_port), fd,
-                                 SSL_get_version(this->wsi[
-                                               this->fds_count]->ssl));
-
-               } else
-#endif
-                       debug("accepted new conn  port %u on fd=%d\n",
-                                         ntohs(cli_addr.sin_port), fd);
-
-               /* intialize the instance struct */
-
-               this->wsi[this->fds_count]->sock = fd;
-               this->wsi[this->fds_count]->state = WSI_STATE_HTTP;
-               this->wsi[this->fds_count]->name_buffer_pos = 0;
-               this->wsi[this->fds_count]->mode = LWS_CONNMODE_WS_SERVING;
-
-               for (n = 0; n < WSI_TOKEN_COUNT; n++) {
-                       this->wsi[this->fds_count]->
-                                            utf8_token[n].token = NULL;
-                       this->wsi[this->fds_count]->
-                                           utf8_token[n].token_len = 0;
-               }
-
-               /*
-                * these can only be set once the protocol is known
-                * we set an unestablished connection's protocol pointer
-                * to the start of the supported list, so it can look
-                * for matching ones during the handshake
-                */
-               this->wsi[this->fds_count]->protocol = this->protocols;
-               this->wsi[this->fds_count]->user_space = NULL;
-
-               /*
-                * Default protocol is 76 / 00
-                * After 76, there's a header specified to inform which
-                * draft the client wants, when that's seen we modify
-                * the individual connection's spec revision accordingly
-                */
-               this->wsi[this->fds_count]->ietf_spec_revision = 0;
-
-fill_in_fds:
-
-               /*
-                * make sure NO events are seen yet on this new socket
-                * (otherwise we inherit old fds[client].revents from
-                * previous socket there and die mysteriously! )
-                */
-               this->fds[this->fds_count].revents = 0;
-
-               this->fds[this->fds_count].events = POLLIN;
-               this->fds[this->fds_count++].fd = fd;
-
-       }
-
-       /* service anything incoming on websocket connection */
-
-       libwebsocket_poll_connections(this);
-
-       /* this round is done */
+       for (n = 0; n < this->fds_count; n++)
+               if (this->fds[n].revents)
+                       libwebsocket_service_fd(this, &this->fds[n]);
 
        return 0;
-
-fatal:
-
-       /* inform caller we are dead */
-
-       return 1;
 }
 
 /**
@@ -484,6 +595,10 @@ fatal:
  *                                      becomes able to be written to without
  *                                      blocking
  *
+ * This only works for internal poll() management, (ie, calling the libwebsocket
+ * service loop, you will have to make your own arrangements if your poll()
+ * loop is managed externally.
+ *
  * @wsi:       Websocket connection instance to get callback for
  */
 
@@ -493,8 +608,8 @@ libwebsocket_callback_on_writable(struct libwebsocket *wsi)
        struct libwebsocket_context *this = wsi->protocol->owning_server;
        int n;
 
-       for (n = this->count_protocols + 1; n < this->fds_count; n++)
-               if (this->wsi[n] == wsi) {
+       for (n = 0; n < this->fds_count; n++)
+               if (this->fds[n].fd == wsi->sock) {
                        this->fds[n].events |= POLLOUT;
                        return 0;
                }
@@ -510,6 +625,10 @@ libwebsocket_callback_on_writable(struct libwebsocket *wsi)
  *                     becomes possible to write to each socket without
  *                     blocking in turn.
  *
+ * This only works for internal poll() management, (ie, calling the libwebsocket
+ * service loop, you will have to make your own arrangements if your poll()
+ * loop is managed externally.
+ *
  * @protocol:  Protocol whose connections will get callbacks
  */
 
@@ -519,11 +638,19 @@ libwebsocket_callback_on_writable_all_protocol(
 {
        struct libwebsocket_context *this = protocol->owning_server;
        int n;
+       int m;
+       struct libwebsocket *wsi;
 
-       for (n = this->count_protocols + 1; n < this->fds_count; n++)
-               if ((unsigned long)this->wsi[n] > LWS_MAX_PROTOCOLS)
-                       if (this->wsi[n]->protocol == protocol)
-                               this->fds[n].events |= POLLOUT;
+       for (n = 0; n < FD_HASHTABLE_MODULUS; n++) {
+
+               for (m = 0; m < this->fd_hashtable[n].length; m++) {
+
+                       wsi = this->fd_hashtable[n].wsi[m];
+
+                       if (wsi->protocol == protocol)
+                               libwebsocket_callback_on_writable(wsi);
+               }
+       }
 
        return 0;
 }
@@ -550,6 +677,10 @@ libwebsocket_get_socket_fd(struct libwebsocket *wsi)
  * If the output side of a server process becomes choked, this allows flow
  * control for the input side.
  *
+ * This only works for internal poll() management, (ie, calling the libwebsocket
+ * service loop, you will have to make your own arrangements if your poll()
+ * loop is managed externally.
+ *
  * @wsi:       Websocket connection instance to get callback for
  * @enable:    0 = disable read servicing for this connection, 1 = enable
  */
@@ -560,8 +691,8 @@ libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
        struct libwebsocket_context *this = wsi->protocol->owning_server;
        int n;
 
-       for (n = this->count_protocols + 1; n < this->fds_count; n++)
-               if (this->wsi[n] == wsi) {
+       for (n = 0; n < this->fds_count; n++)
+               if (this->fds[n].fd == wsi->sock) {
                        if (enable)
                                this->fds[n].events |= POLLIN;
                        else
@@ -598,6 +729,7 @@ static void sigpipe_handler(int x)
 }
 
 
+
 /**
  * libwebsocket_create_context() - Create the websocket handler
  * @port:      Port to listen on... you can use 0 to suppress listening on
@@ -659,6 +791,7 @@ libwebsocket_create_context(int port,
        char *p;
        char hostname[1024];
        struct hostent *he;
+       struct libwebsocket *wsi;
 
 #ifdef LWS_OPENSSL_SUPPORT
        SSL_METHOD *method;
@@ -675,6 +808,7 @@ libwebsocket_create_context(int port,
        this->http_proxy_port = 0;
        this->http_proxy_address[0] = '\0';
        this->options = options;
+       this->fds_count = 0;
 
        this->fd_random = open(SYSTEM_RANDOM_FILEPATH, O_RDONLY);
        if (this->fd_random < 0) {
@@ -834,6 +968,11 @@ libwebsocket_create_context(int port,
        if (lws_b64_selftest())
                return NULL;
 
+       /* fd hashtable init */
+
+       for (n = 0; n < FD_HASHTABLE_MODULUS; n++)
+               this->fd_hashtable[n].length = 0;
+
        /* set up our external listening socket we serve on */
 
        if (port) {
@@ -859,6 +998,20 @@ libwebsocket_create_context(int port,
                                                                port, n, errno);
                        return NULL;
                }
+
+               wsi = malloc(sizeof(struct libwebsocket));
+               memset(wsi, 0, sizeof (struct libwebsocket));
+               wsi->sock = sockfd;
+               wsi->mode = LWS_CONNMODE_SERVER_LISTENER;
+               insert_wsi(this, wsi);
+
+               listen(sockfd, 5);
+               fprintf(stderr, " Listening on port %d\n", port);
+
+               /* list in the internal poll array */
+               
+               this->fds[this->fds_count].fd = sockfd;
+               this->fds[this->fds_count++].events = POLLIN;
        }
 
        /* drop any root privs for this process */
@@ -870,27 +1023,11 @@ libwebsocket_create_context(int port,
                if (setuid(uid))
                        fprintf(stderr, "setuid: %s\n", strerror(errno));
 
-       /*
-        * prepare the poll() fd array... it's like this
-        *
-        * [0] = external listening socket
-        * [1 .. this->count_protocols] = per-protocol broadcast sockets
-        * [this->count_protocols + 1 ... this->fds_count-1] = connection skts
-        */
-
-       this->fds_count = 1;
-       this->fds[0].fd = sockfd;
-       this->fds[0].events = POLLIN;
-       this->count_protocols = 0;
-
-       if (port) {
-               listen(sockfd, 5);
-               fprintf(stderr, " Listening on port %d\n", port);
-       }
 
        /* set up our internal broadcast trigger sockets per-protocol */
 
-       for (; protocols[this->count_protocols].callback;
+       for (this->count_protocols = 0;
+                       protocols[this->count_protocols].callback;
                                                      this->count_protocols++) {
                protocols[this->count_protocols].owning_server = this;
                protocols[this->count_protocols].protocol_index =
@@ -931,10 +1068,20 @@ libwebsocket_create_context(int port,
                                protocols[this->count_protocols].name,
                                                      ntohs(cli_addr.sin_port));
 
+               /* dummy wsi per broadcast proxy socket */
+
+               wsi = malloc(sizeof(struct libwebsocket));
+               memset(wsi, 0, sizeof (struct libwebsocket));
+               wsi->sock = fd;
+               wsi->mode = LWS_CONNMODE_BROADCAST_PROXY_LISTENER;
+               /* note which protocol we are proxying */
+               wsi->protocol_index_for_broadcast_proxy = this->count_protocols;
+               insert_wsi(this, wsi);
+
+               /* list in internal poll array */
+
                this->fds[this->fds_count].fd = fd;
                this->fds[this->fds_count].events = POLLIN;
-               /* wsi only exists for connections, not broadcast listener */
-               this->wsi[this->fds_count] = NULL;
                this->fds_count++;
        }
 
@@ -1055,6 +1202,8 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol,
 {
        struct libwebsocket_context *this = protocol->owning_server;
        int n;
+       int m;
+       struct libwebsocket * wsi;
 
        if (!protocol->broadcast_socket_user_fd) {
                /*
@@ -1069,23 +1218,33 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol,
                 * called in the poll thread context and are serialized.
                 */
 
-               for (n = this->count_protocols + 1; n < this->fds_count; n++) {
+               for (n = 0; n < FD_HASHTABLE_MODULUS; n++) {
 
-                       if ((unsigned long)this->wsi[n] < LWS_MAX_PROTOCOLS)
-                               continue;
+                       for (m = 0; m < this->fd_hashtable[n].length; m++) {
 
-                       /* never broadcast to non-established connection */
-                       if (this->wsi[n]->state != WSI_STATE_ESTABLISHED)
-                               continue;
+                               wsi = this->fd_hashtable[n].wsi[m];
 
-                       /* only broadcast to guys using requested protocol */
-                       if (this->wsi[n]->protocol != protocol)
-                               continue;
+                               if (wsi->mode != LWS_CONNMODE_WS_SERVING)
+                                       continue;
 
-                       this->wsi[n]->protocol->callback(this->wsi[n],
+                               /*
+                                * never broadcast to
+                                * non-established connections
+                                */
+                               if (wsi->state != WSI_STATE_ESTABLISHED)
+                                       continue;
+
+                               /* only broadcast to guys using
+                                * requested protocol
+                                */
+                               if (wsi->protocol != protocol)
+                                       continue;
+
+                               wsi->protocol->callback(wsi,
                                         LWS_CALLBACK_BROADCAST,
-                                        this->wsi[n]->user_space,
+                                        wsi->user_space,
                                         buf, len);
+                       }
                }
 
                return 0;
index 6f06bcf..a228794 100644 (file)
@@ -1143,6 +1143,8 @@ int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
                                        */
                                        buf[0] = 'C';
                                        break;
+                               default:
+                                       break;
                                }
                        }
                        break;
index 300cfea..bf8d528 100644 (file)
@@ -68,7 +68,7 @@ static inline void debug(const char *format, ...)
 }
 #endif
 
-
+#define FD_HASHTABLE_MODULUS 32
 #define MAX_CLIENTS 100
 #define LWS_MAX_HEADER_NAME_LENGTH 64
 #define LWS_MAX_HEADER_LEN 4096
@@ -166,11 +166,29 @@ struct lws_tokens {
        int token_len;
 };
 
+enum connection_mode {
+       LWS_CONNMODE_WS_SERVING,
+       LWS_CONNMODE_WS_CLIENT,
+
+       /* special internal types */
+       LWS_CONNMODE_SERVER_LISTENER,
+       LWS_CONNMODE_BROADCAST_PROXY_LISTENER,
+       LWS_CONNMODE_BROADCAST_PROXY
+};
+
+
+#define LWS_FD_HASH(fd) ((fd ^ (fd >> 8) ^ (fd >> 16)) % FD_HASHTABLE_MODULUS)
+
+struct libwebsocket_fd_hashtable {
+       struct libwebsocket *wsi[MAX_CLIENTS + 1];
+       int length;
+};
+
 struct libwebsocket_protocols;
 
 struct libwebsocket_context {
-       struct libwebsocket *wsi[MAX_CLIENTS + 1];
-       struct pollfd fds[MAX_CLIENTS + 1];
+       struct libwebsocket_fd_hashtable fd_hashtable[FD_HASHTABLE_MODULUS];
+       struct pollfd fds[MAX_CLIENTS * FD_HASHTABLE_MODULUS + 1];
        int fds_count;
        int listen_port;
        char http_proxy_address[256];
@@ -189,10 +207,6 @@ struct libwebsocket_context {
        int count_protocols;
 };
 
-enum connection_mode {
-       LWS_CONNMODE_WS_SERVING,
-       LWS_CONNMODE_WS_CLIENT,
-};
 
 
 /*
@@ -215,6 +229,7 @@ struct libwebsocket {
        char rx_user_buffer[LWS_SEND_BUFFER_PRE_PADDING + MAX_USER_RX_BUFFER +
                                                  LWS_SEND_BUFFER_POST_PADDING];
        int rx_user_buffer_head;
+       int protocol_index_for_broadcast_proxy;
 
        int sock;
 
@@ -279,3 +294,12 @@ xor_mask_04(struct libwebsocket *wsi, unsigned char c);
 
 extern unsigned char
 xor_mask_05(struct libwebsocket *wsi, unsigned char c);
+
+extern struct libwebsocket *
+wsi_from_fd(struct libwebsocket_context *this, int fd);
+
+extern int
+insert_wsi(struct libwebsocket_context *this, struct libwebsocket *wsi);
+
+extern int
+delete_from_fd(struct libwebsocket_context *this, int fd);
index 422ed7c..2184f16 100644 (file)
@@ -67,6 +67,13 @@ nothing is pending, or as soon as it services whatever was pending.
 <dt><b>wsi</b>
 <dd>Websocket connection instance to get callback for
 </dl>
+<h3>Description</h3>
+<blockquote>
+<p>
+This only works for internal <b>poll</b> management, (ie, calling the libwebsocket
+service loop, you will have to make your own arrangements if your <b>poll</b>
+loop is managed externally.
+</blockquote>
 <hr>
 <h2>libwebsocket_callback_on_writable_all_protocol - Request a callback for all connections using the given protocol when it becomes possible to write to each socket without blocking in turn.</h2>
 <i>int</i>
@@ -77,6 +84,13 @@ nothing is pending, or as soon as it services whatever was pending.
 <dt><b>protocol</b>
 <dd>Protocol whose connections will get callbacks
 </dl>
+<h3>Description</h3>
+<blockquote>
+<p>
+This only works for internal <b>poll</b> management, (ie, calling the libwebsocket
+service loop, you will have to make your own arrangements if your <b>poll</b>
+loop is managed externally.
+</blockquote>
 <hr>
 <h2>libwebsocket_get_socket_fd - returns the socket file descriptor</h2>
 <i>int</i>
@@ -110,6 +124,10 @@ You will not need this unless you are doing something special
 <p>
 If the output side of a server process becomes choked, this allows flow
 control for the input side.
+<p>
+This only works for internal <b>poll</b> management, (ie, calling the libwebsocket
+service loop, you will have to make your own arrangements if your <b>poll</b>
+loop is managed externally.
 </blockquote>
 <hr>
 <h2>libwebsocket_canonical_hostname - returns this host's hostname</h2>