Upgrade evcom - fix accepting too many connections issue
authorRyan <ry@tinyclouds.org>
Tue, 4 Aug 2009 12:44:40 +0000 (14:44 +0200)
committerRyan <ry@tinyclouds.org>
Tue, 4 Aug 2009 12:51:41 +0000 (14:51 +0200)
deps/evcom/evcom.c
deps/evcom/evcom.h
deps/evcom/test/test.c
src/net.cc

index 11a1796..2476908 100644 (file)
@@ -63,6 +63,35 @@ static int secure_half_goodbye (evcom_socket *socket);
 #define AGAIN 1
 #define ERROR 2 
 
+#define ATTACHED(s)         ((s)->flags & EVCOM_ATTACHED)
+#define LISTENING(s)        ((s)->flags & EVCOM_LISTENING)
+#define CONNECTED(s)        ((s)->flags & EVCOM_CONNECTED)
+#define SECURE(s)           ((s)->flags & EVCOM_SECURE)
+#define GOT_HALF_CLOSE(s)   ((s)->flags & EVCOM_GOT_HALF_CLOSE)
+#define GOT_FULL_CLOSE(s)   ((s)->flags & EVCOM_GOT_FULL_CLOSE)
+#define TOO_MANY_CONN(s)    ((s)->flags & EVCOM_TOO_MANY_CONN)
+
+
+static void
+accept_new_connections (evcom_server *server)
+{
+  if (TOO_MANY_CONN(server)) {
+#if EV_MULTIPLICITY
+    evcom_server_attach(server->loop, server);
+#else 
+    evcom_server_attach(server);
+#endif 
+    server->flags &= ~EVCOM_TOO_MANY_CONN;
+  }
+}
+
+static void
+dont_accept_new_connections (evcom_server *server)
+{
+  evcom_server_detach(server);
+  server->flags |= EVCOM_TOO_MANY_CONN;
+}
+
 EV_INLINE int
 set_nonblock (int fd)
 {
@@ -107,11 +136,6 @@ evcom_buf_new (const char *base, size_t len)
   return buf;
 }
 
-#define CLOSE_ASAP(socket) do {         \
-  (socket)->read_action = full_close;   \
-  (socket)->write_action = full_close;  \
-} while (0)
-
 static int 
 full_close (evcom_socket *socket)
 {
@@ -132,6 +156,13 @@ full_close (evcom_socket *socket)
   return OKAY;
 }
 
+static inline void
+close_asap (evcom_socket *socket)
+{
+  socket->read_action = full_close;
+  socket->write_action = full_close;
+}
+
 static int 
 half_close (evcom_socket *socket)
 {
@@ -168,15 +199,15 @@ change_state_for_empty_out_stream (evcom_socket *socket)
     return;
   }
 
-  if (socket->got_half_close == FALSE) {
-    if (socket->got_full_close == FALSE) {
+  if (!GOT_HALF_CLOSE(socket)) {
+    if (!GOT_FULL_CLOSE(socket)) {
       /* Normal situation. Didn't get any close signals. */
       ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
     } else {
       /* Got Full Close. */
       if (socket->read_action) {
 #if EVCOM_HAVE_GNUTLS
-        socket->read_action = socket->secure ? secure_full_goodbye : full_close;
+        socket->read_action = SECURE(socket) ? secure_full_goodbye : full_close;
 #else 
         socket->read_action = full_close;
 #endif
@@ -184,7 +215,7 @@ change_state_for_empty_out_stream (evcom_socket *socket)
 
       if (socket->write_action) {
 #if EVCOM_HAVE_GNUTLS
-        socket->write_action = socket->secure ? secure_full_goodbye : full_close;
+        socket->write_action = SECURE(socket) ? secure_full_goodbye : full_close;
 #else 
         socket->write_action = full_close;
 #endif
@@ -194,7 +225,7 @@ change_state_for_empty_out_stream (evcom_socket *socket)
     /* Got Half Close. */
     if (socket->write_action) {
 #if EVCOM_HAVE_GNUTLS
-      socket->write_action = socket->secure ? secure_half_goodbye : half_close;
+      socket->write_action = SECURE(socket) ? secure_half_goodbye : half_close;
 #else 
       socket->write_action = half_close;
 #endif
@@ -236,7 +267,7 @@ static ssize_t
 nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len)
 {
   evcom_socket *socket = (evcom_socket*)data;
-  assert(socket->secure);
+  assert(SECURE(socket));
   int flags = 0;
 #ifdef MSG_NOSIGNAL
   flags |= MSG_NOSIGNAL;
@@ -256,7 +287,7 @@ nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len)
 static int
 secure_handshake (evcom_socket *socket)
 {
-  assert(socket->secure);
+  assert(SECURE(socket));
 
   int r = gnutls_handshake(socket->session);
 
@@ -270,8 +301,8 @@ secure_handshake (evcom_socket *socket)
 
   evcom_socket_reset_timeout(socket);
 
-  if (!socket->connected) {
-    socket->connected = TRUE;
+  if (!CONNECTED(socket)) {
+    socket->flags |= EVCOM_CONNECTED;
     if (socket->on_connect) socket->on_connect(socket);
   }
 
@@ -299,7 +330,7 @@ secure_socket_send (evcom_socket *socket)
   evcom_queue *q = evcom_queue_last(&socket->out_stream);
   evcom_buf *to_write = evcom_queue_data(q, evcom_buf, queue);
 
-  assert(socket->secure);
+  assert(SECURE(socket));
 
   sent = gnutls_record_send(socket->session,
     to_write->base + to_write->written,
@@ -337,7 +368,7 @@ secure_socket_recv (evcom_socket *socket)
   size_t recv_buffer_size = socket->chunksize;
   ssize_t recved;
 
-  assert(socket->secure);
+  assert(SECURE(socket));
 
   recved = gnutls_record_recv(socket->session, recv_buffer, recv_buffer_size);
 
@@ -373,7 +404,7 @@ secure_socket_recv (evcom_socket *socket)
     /* Got EOF */
     if (recved == 0) {
       socket->read_action = NULL;
-      if (socket->write_action == NULL) CLOSE_ASAP(socket);
+      if (socket->write_action == NULL) close_asap(socket);
     }
 
     if (socket->write_action) {
@@ -394,7 +425,7 @@ secure_socket_recv (evcom_socket *socket)
 static int
 secure_full_goodbye (evcom_socket *socket)
 {
-  assert(socket->secure);
+  assert(SECURE(socket));
 
   int r = gnutls_bye(socket->session, GNUTLS_SHUT_RDWR);
 
@@ -405,7 +436,7 @@ secure_full_goodbye (evcom_socket *socket)
 
   if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) return AGAIN;
 
-  CLOSE_ASAP(socket);
+  close_asap(socket);
 
   return OKAY;
 }
@@ -413,7 +444,7 @@ secure_full_goodbye (evcom_socket *socket)
 static int
 secure_half_goodbye (evcom_socket *socket)
 {
-  assert(socket->secure);
+  assert(SECURE(socket));
 
   int r = gnutls_bye(socket->session, GNUTLS_SHUT_WR);
 
@@ -433,7 +464,7 @@ void
 evcom_socket_set_secure_session (evcom_socket *socket, gnutls_session_t session)
 {
   socket->session = session;
-  socket->secure = TRUE;
+  socket->flags |= EVCOM_SECURE;
 }
 #endif /* HAVE GNUTLS */
 
@@ -442,7 +473,7 @@ socket_send (evcom_socket *socket)
 {
   ssize_t sent;
 
-  assert(socket->secure == FALSE);
+  assert(!SECURE(socket));
 
   if (evcom_queue_empty(&socket->out_stream)) {
     ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
@@ -484,8 +515,8 @@ socket_send (evcom_socket *socket)
 
   evcom_socket_reset_timeout(socket);
 
-  if (!socket->connected) {
-    socket->connected = TRUE;
+  if (!CONNECTED(socket)) {
+    socket->flags |= EVCOM_CONNECTED;
     if (socket->on_connect) socket->on_connect(socket);
   }
 
@@ -501,10 +532,10 @@ socket_recv (evcom_socket *socket)
   size_t buf_size = TCP_MAXWIN;
   ssize_t recved;
 
-  assert(socket->secure == FALSE);
+  assert(!SECURE(socket));
 
-  if (!socket->connected) {
-    socket->connected = TRUE;
+  if (!CONNECTED(socket)) {
+    socket->flags |= EVCOM_CONNECTED;
     if (socket->on_connect) socket->on_connect(socket);
     //return OKAY;
   }
@@ -534,7 +565,7 @@ socket_recv (evcom_socket *socket)
   if (recved == 0) {
     evcom_socket_read_stop(socket);
     socket->read_action = NULL;
-    if (socket->write_action == NULL) CLOSE_ASAP(socket);
+    if (socket->write_action == NULL) close_asap(socket);
   }
 
   /* NOTE: EOF is signaled with recved == 0 on callback */
@@ -555,7 +586,7 @@ assign_file_descriptor (evcom_socket *socket, int fd)
   socket->write_action = socket_send;
 
 #if EVCOM_HAVE_GNUTLS
-  if (socket->secure) {
+  if (SECURE(socket)) {
     gnutls_transport_set_lowat(socket->session, 0); 
     gnutls_transport_set_push_function(socket->session, nosigpipe_push);
     gnutls_transport_set_ptr2(socket->session,
@@ -570,11 +601,11 @@ assign_file_descriptor (evcom_socket *socket, int fd)
 static void
 server_close_with_error (evcom_server *server, int errorno)
 {
-  if (server->listening) {
+  if (LISTENING(server)) {
     evcom_server_detach(server);
     close(server->fd); /* TODO do this on the loop? check return value? */
     server->fd = -1;
-    server->listening = FALSE;
+    server->flags &= ~EVCOM_LISTENING;
 
     if (server->on_close) {
       server->on_close(server, errorno);
@@ -595,12 +626,17 @@ accept_connection (evcom_server *server)
   
   int fd = accept(server->fd, (struct sockaddr*)&address, &addr_len);
   if (fd < 0) {
-#ifdef EWOULDBLOCK
-    if (errno == EWOULDBLOCK) return NULL;
-#else
-    if (errno == EAGAIN) return NULL;
-#endif 
-    goto error;
+    switch (errno) {
+      case EMFILE:
+        dont_accept_new_connections(server);
+        return NULL;
+
+      case EAGAIN:
+        return NULL;
+
+      default:
+        goto error;
+    }
   }
 
   evcom_socket *socket = NULL;
@@ -641,7 +677,7 @@ on_connection (EV_P_ ev_io *watcher, int revents)
 {
   evcom_server *server = watcher->data;
 
-  assert(server->listening);
+  assert(LISTENING(server));
 #if EV_MULTIPLICITY
   assert(server->loop == loop);
 #endif 
@@ -663,7 +699,7 @@ int
 evcom_server_listen (evcom_server *server, struct addrinfo *addrinfo, int backlog)
 {
   int fd = -1;
-  assert(server->listening == FALSE);
+  assert(!LISTENING(server));
 
   fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
       addrinfo->ai_protocol);
@@ -699,7 +735,7 @@ evcom_server_listen (evcom_server *server, struct addrinfo *addrinfo, int backlo
   }
   
   server->fd = fd;
-  server->listening = TRUE;
+  server->flags |= EVCOM_LISTENING;
   ev_io_set (&server->connection_watcher, server->fd, EV_READ);
   
   return 0;
@@ -722,7 +758,7 @@ evcom_server_attach (EV_P_ evcom_server *server)
 #if EV_MULTIPLICITY
   server->loop = loop;
 #endif
-  server->attached = TRUE;
+  server->flags |= EVCOM_ATTACHED;
 }
 
 void
@@ -732,14 +768,13 @@ evcom_server_detach (evcom_server *server)
 #if EV_MULTIPLICITY
   server->loop = NULL;
 #endif
-  server->attached = FALSE;
+  server->flags &= ~EVCOM_ATTACHED;
 }
 
 void 
 evcom_server_init (evcom_server *server)
 {
-  server->attached = FALSE;
-  server->listening = FALSE;
+  server->flags = 0;
   server->fd = -1;
   server->connection_watcher.data = server;
   ev_init (&server->connection_watcher, on_connection);
@@ -784,7 +819,7 @@ on_io_event(EV_P_ ev_io *watcher, int revents)
 
   if (revents & EV_ERROR) {
     socket->errorno = 1;
-    CLOSE_ASAP(socket);
+    close_asap(socket);
   }
 
   int r;
@@ -801,7 +836,7 @@ on_io_event(EV_P_ ev_io *watcher, int revents)
       if (r == AGAIN) {
         have_read_event = FALSE;
       } else { 
-        if (r == ERROR) CLOSE_ASAP(socket);
+        if (r == ERROR) close_asap(socket);
       }
     }
 
@@ -814,7 +849,7 @@ on_io_event(EV_P_ ev_io *watcher, int revents)
       if (r == AGAIN) {
         have_write_event = FALSE;
       } else {
-        if (r == ERROR) CLOSE_ASAP(socket);
+        if (r == ERROR) close_asap(socket);
       }
     }
   }
@@ -830,6 +865,10 @@ on_io_event(EV_P_ ev_io *watcher, int revents)
     evcom_socket_detach(socket);
     assert(socket->fd == -1);
 
+    if (socket->server) {
+      accept_new_connections(socket->server);
+    }
+
     if (socket->on_close) socket->on_close(socket);
     /* WARNING: user can free socket in on_close so no more 
      * access beyond this point. */
@@ -851,8 +890,7 @@ evcom_socket_init (evcom_socket *socket, float timeout)
 #if EV_MULTIPLICITY
   socket->loop = NULL;
 #endif
-  socket->attached = FALSE;
-  socket->connected = FALSE;
+  socket->flags = 0;
 
   evcom_queue_init(&socket->out_stream);
 
@@ -862,12 +900,8 @@ evcom_socket_init (evcom_socket *socket, float timeout)
   ev_init(&socket->read_watcher, on_io_event);
   socket->read_watcher.data = socket;
 
-  socket->got_full_close = FALSE;
-  socket->got_half_close = FALSE;
-
   socket->errorno = 0;
 
-  socket->secure = FALSE;
 #if EVCOM_HAVE_GNUTLS
   socket->gnutls_errorno = 0;
   socket->session = NULL;
@@ -890,7 +924,7 @@ evcom_socket_init (evcom_socket *socket, float timeout)
 void 
 evcom_socket_close (evcom_socket *socket)
 {
-  socket->got_half_close = TRUE;
+  socket->flags |= EVCOM_GOT_HALF_CLOSE;
   if (evcom_queue_empty(&socket->out_stream)) {
     change_state_for_empty_out_stream(socket);
   }
@@ -899,7 +933,7 @@ evcom_socket_close (evcom_socket *socket)
 void 
 evcom_socket_full_close (evcom_socket *socket)
 {
-  socket->got_full_close = TRUE;
+  socket->flags |= EVCOM_GOT_FULL_CLOSE;
   if (evcom_queue_empty(&socket->out_stream)) {
     change_state_for_empty_out_stream(socket);
   }
@@ -916,7 +950,13 @@ void evcom_socket_force_close (evcom_socket *socket)
   socket->write_action = socket->read_action = NULL;
   // socket->errorno = EVCOM_SOCKET_ERROR_FORCE_CLOSE
   
-  if (socket->fd > 0) close(socket->fd);
+  if (socket->fd > 0) {
+    close(socket->fd);
+    if (socket->server) {
+      accept_new_connections(socket->server);
+    }
+  }
+
   socket->fd = -1;
 
   evcom_socket_detach(socket);
@@ -929,7 +969,7 @@ evcom_socket_write (evcom_socket *socket, evcom_buf *buf)
     assert(0 && "Do not write to a closed socket"); 
     goto error;
   }
-  if (socket->got_full_close == TRUE || socket->got_half_close == TRUE) {
+  if (GOT_FULL_CLOSE(socket) || GOT_HALF_CLOSE(socket)) {
     assert(0 && "Do not write to a closing socket");
     goto error;
   }
@@ -937,7 +977,7 @@ evcom_socket_write (evcom_socket *socket, evcom_buf *buf)
   evcom_queue_insert_head(&socket->out_stream, &buf->queue);
   buf->written = 0;
 
-  if (socket->attached) {
+  if (ATTACHED(socket)) {
     ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
   }
   return;
@@ -979,7 +1019,7 @@ evcom_socket_attach (EV_P_ evcom_socket *socket)
 #if EV_MULTIPLICITY
   socket->loop = loop;
 #endif 
-  socket->attached = TRUE;
+  socket->flags |= EVCOM_ATTACHED;
 
   ev_timer_again(EV_A_ &socket->timeout_watcher);
 
@@ -995,14 +1035,14 @@ evcom_socket_attach (EV_P_ evcom_socket *socket)
 void
 evcom_socket_detach (evcom_socket *socket)
 {
-  if (socket->attached) {
+  if (ATTACHED(socket)) {
     ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
     ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
     ev_timer_stop(SOCKET_LOOP_ &socket->timeout_watcher);
 #if EV_MULTIPLICITY
     socket->loop = NULL;
 #endif
-    socket->attached = FALSE;
+    socket->flags &= ~EVCOM_ATTACHED;
   }
 }
 
index cc2fc10..3161dce 100644 (file)
@@ -47,6 +47,15 @@ typedef struct evcom_buf     evcom_buf;
 typedef struct evcom_server  evcom_server;
 typedef struct evcom_socket  evcom_socket;
 
+/* flags for socket and server */
+#define EVCOM_ATTACHED          0x0001
+#define EVCOM_LISTENING         0x0002
+#define EVCOM_CONNECTED         0x0004
+#define EVCOM_SECURE            0x0008
+#define EVCOM_GOT_HALF_CLOSE    0x0010
+#define EVCOM_GOT_FULL_CLOSE    0x0020
+#define EVCOM_TOO_MANY_CONN     0x0040
+
 void evcom_server_init          (evcom_server *);
  int evcom_server_listen        (evcom_server *, struct addrinfo *addrinfo, int backlog);
 void evcom_server_attach        (EV_P_ evcom_server *);
@@ -65,7 +74,6 @@ void evcom_socket_read_stop     (evcom_socket *);
 void evcom_socket_reset_timeout (evcom_socket *);
 
 /* Writes a buffer to the socket. 
- * (Do not send a NULL evcom_buf or a buffer with evcom_buf->base == NULL.)
  */
 void evcom_socket_write         (evcom_socket *, evcom_buf *);
 
@@ -129,8 +137,7 @@ struct evcom_server {
 #if EV_MULTIPLICITY
   struct ev_loop *loop;
 #endif
-  unsigned attached:1;
-  unsigned listening:1;
+  unsigned flags;
 
   /* PRIVATE */
   ev_io connection_watcher;
@@ -158,11 +165,7 @@ struct evcom_socket {
   evcom_server *server;
   evcom_queue out_stream;
   size_t written;
-  unsigned attached:1;
-  unsigned connected:1;
-  unsigned secure:1;
-  unsigned got_full_close:1;
-  unsigned got_half_close:1;
+  unsigned flags;
 
   /* NULL = that end of the socket is closed. */
   int (*read_action)  (evcom_socket *);
index d47a9b4..7870176 100644 (file)
@@ -103,7 +103,7 @@ void anon_tls_client (evcom_socket *socket)
   gnutls_credentials_set(client_session, GNUTLS_CRD_ANON, client_credentials);
 
   evcom_socket_set_secure_session(socket, client_session);
-  assert(socket->secure);
+  assert(socket->flags & EVCOM_SECURE);
 }
 
 #endif // EVCOM_HAVE_GNUTLS
index 6a254ec..191a5df 100644 (file)
@@ -132,10 +132,10 @@ Connection::New (const Arguments& args)
 enum Connection::readyState
 Connection::ReadyState (void)
 {
-  if (socket_.got_full_close)
+  if (socket_.flags & EVCOM_GOT_FULL_CLOSE)
     return CLOSED;
 
-  if (socket_.got_half_close)
+  if (socket_.flags & EVCOM_GOT_HALF_CLOSE)
     return (socket_.read_action == NULL ? CLOSED : READ_ONLY);
 
   if (socket_.read_action && socket_.write_action)