handle EAGAIN during send
authorAndy Green <andy.green@linaro.org>
Mon, 9 Dec 2013 06:16:17 +0000 (14:16 +0800)
committerAndy Green <andy.green@linaro.org>
Mon, 9 Dec 2013 06:16:17 +0000 (14:16 +0800)
This patch deploys the truncated send work to buffer output in case
either send() or the SSL send return a temporary "unable to send"
condition even though they signalled as writeable.

I added a by-default #if 0 test jig which enforces only half of what
you want to send is sendable, this is working when enabled.

One subtle change is that the pipe reports choked if there is any
pending remaining truncated send.  Otherwise it should be transparent.

Hopefully...

Signed-off-by: Andy Green <andy.green@linaro.org>
lib/libwebsockets.c
lib/output.c
lib/private-libwebsockets.h
lib/server.c
test-server/test-server.c

index 31e1424..7342e9b 100644 (file)
@@ -376,10 +376,10 @@ just_kill_connection:
                        free(wsi->u.ws.rxflow_buffer);
                        wsi->u.ws.rxflow_buffer = NULL;
                }
-               if (wsi->u.ws.truncated_send_malloc) {
+               if (wsi->truncated_send_malloc) {
                        /* not going to be completed... nuke it */
-                       free(wsi->u.ws.truncated_send_malloc);
-                       wsi->u.ws.truncated_send_malloc = NULL;
+                       free(wsi->truncated_send_malloc);
+                       wsi->truncated_send_malloc = NULL;
                }
        }
 
@@ -642,6 +642,10 @@ LWS_VISIBLE int lws_send_pipe_choked(struct libwebsocket *wsi)
 {
        struct pollfd fds;
 
+       /* treat the fact we got a truncated send pending as if we're choked */
+       if (wsi->truncated_send_malloc)
+               return 1;
+
        fds.fd = wsi->sock;
        fds.events = POLLOUT;
        fds.revents = 0;
@@ -671,10 +675,10 @@ lws_handle_POLLOUT_event(struct libwebsocket_context *context,
 
        /* pending truncated sends have uber priority */
 
-       if (wsi->u.ws.truncated_send_malloc) {
-               lws_issue_raw(wsi, wsi->u.ws.truncated_send_malloc +
-                               wsi->u.ws.truncated_send_offset,
-                                               wsi->u.ws.truncated_send_len);
+       if (wsi->truncated_send_malloc) {
+               lws_issue_raw(wsi, wsi->truncated_send_malloc +
+                               wsi->truncated_send_offset,
+                                               wsi->truncated_send_len);
                /* leave POLLOUT active either way */
                return 0;
        }
index a375dea..c107401 100644 (file)
@@ -100,6 +100,14 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len)
        int n;
 #ifndef LWS_NO_EXTENSIONS
        int m;
+       size_t real_len = len;
+
+       if (wsi->truncated_send_malloc &&
+               (buf < wsi->truncated_send_malloc ||
+                       buf > (wsi->truncated_send_malloc + wsi->truncated_send_len +  wsi->truncated_send_offset))) {
+               lwsl_err("****** %x Sending something else while pending truncated ...\n", wsi);
+               assert(0);
+       }
 
        /*
         * one of the extensions is carrying our data itself?  Like mux?
@@ -138,12 +146,23 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len)
        lws_hexdump(buf, len);
 #endif
 
+#if 0
+       /* test partial send support by forcing multiple sends on everything */
+       len = len / 2;
+       if (!len)
+               len = 1;
+#endif
+
        lws_latency_pre(context, wsi);
 #ifdef LWS_OPENSSL_SUPPORT
        if (wsi->ssl) {
                n = SSL_write(wsi->ssl, buf, len);
                lws_latency(context, wsi, "SSL_write lws_issue_raw", n, n >= 0);
                if (n < 0) {
+                       if (errno == EAGAIN || errno == EINTR) {
+                               n = 0;
+                               goto handle_truncated_send;
+                       }
                        lwsl_debug("ERROR writing to socket\n");
                        return -1;
                }
@@ -152,6 +171,10 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len)
                n = send(wsi->sock, buf, len, MSG_NOSIGNAL);
                lws_latency(context, wsi, "send lws_issue_raw", n, n == len);
                if (n < 0) {
+                       if (errno == EAGAIN || errno == EINTR) {
+                               n = 0;
+                               goto handle_truncated_send;
+                       }
                        lwsl_debug("ERROR writing len %d to skt %d\n", len, n);
                        return -1;
                }
@@ -166,16 +189,17 @@ handle_truncated_send:
        /*
         * already handling a truncated send?
         */
-       if (wsi->u.ws.truncated_send_malloc) {
-               lwsl_info("***** partial send moved on by %d (vs %d)\n", n, len);
-               wsi->u.ws.truncated_send_offset += n;
-               wsi->u.ws.truncated_send_len -= n;
+       if (wsi->truncated_send_malloc) {
+               lwsl_info("***** %x partial send moved on by %d (vs %d)\n", wsi, n, real_len);
+               wsi->truncated_send_offset += n;
+               wsi->truncated_send_len -= n;
 
-               if (!wsi->u.ws.truncated_send_len) {
-                       lwsl_info("***** partial send completed\n");
+               if (!wsi->truncated_send_len) {
+                       lwsl_info("***** %x partial send completed\n", wsi);
                        /* done with it */
-                       free(wsi->u.ws.truncated_send_malloc);
-                       wsi->u.ws.truncated_send_malloc = NULL;
+                       free(wsi->truncated_send_malloc);
+                       wsi->truncated_send_malloc = NULL;
+                       n = real_len;
                } else
                        libwebsocket_callback_on_writable(
                                             wsi->protocol->owning_server, wsi);
@@ -183,7 +207,7 @@ handle_truncated_send:
                return n;
        }
 
-       if (n < len) {
+       if (n < real_len) {
                if (wsi->u.ws.clean_buffer)
                        /*
                         * This buffer unaffected by extension rewriting.
@@ -198,23 +222,23 @@ handle_truncated_send:
                 * Newly truncated send.  Buffer the remainder (it will get
                 * first priority next time the socket is writable)
                 */
-               lwsl_info("***** new partial send %d sent %d accepted\n", len, n);
+               lwsl_info("***** %x new partial sent %d from %d total\n", wsi, n, real_len);
 
-               wsi->u.ws.truncated_send_malloc = malloc(len - n);
-               if (!wsi->u.ws.truncated_send_malloc) {
+               wsi->truncated_send_malloc = malloc(real_len - n);
+               if (!wsi->truncated_send_malloc) {
                        lwsl_err("truncated send: unable to malloc %d\n",
-                                                                      len - n);
+                                                                 real_len - n);
                        return -1;
                }
 
-               wsi->u.ws.truncated_send_offset = 0;
-               wsi->u.ws.truncated_send_len = len - n;
-               memcpy(wsi->u.ws.truncated_send_malloc, buf + n, len - n);
+               wsi->truncated_send_offset = 0;
+               wsi->truncated_send_len = real_len - n;
+               memcpy(wsi->truncated_send_malloc, buf + n, real_len - n);
 
                libwebsocket_callback_on_writable(
                                             wsi->protocol->owning_server, wsi);
 
-               return len;
+               return real_len;
        }
 
        return n;
@@ -304,7 +328,7 @@ lws_issue_raw_ext_access(struct libwebsocket *wsi,
                 */
 
                if (!lws_send_pipe_choked(wsi) &&
-                                       !wsi->u.ws.truncated_send_malloc)
+                                       !wsi->truncated_send_malloc)
                        /* no we could add more, lets's do that */
                        continue;
 
@@ -608,6 +632,17 @@ LWS_VISIBLE int libwebsockets_serve_http_file_fragment(
        int n, m;
 
        while (!lws_send_pipe_choked(wsi)) {
+
+               if (wsi->truncated_send_malloc) {
+                       lws_issue_raw(wsi, wsi->truncated_send_malloc +
+                                       wsi->truncated_send_offset,
+                                                       wsi->truncated_send_len);
+                       continue;
+               }
+
+               if (wsi->u.http.filepos == wsi->u.http.filelen)
+                       goto all_sent;
+
                n = read(wsi->u.http.fd, context->service_buffer,
                                               sizeof(context->service_buffer));
                if (n > 0) {
@@ -624,8 +659,9 @@ LWS_VISIBLE int libwebsockets_serve_http_file_fragment(
 
                if (n < 0)
                        return -1; /* caller will close */
-
-               if (wsi->u.http.filepos == wsi->u.http.filelen) {
+all_sent:
+               if (!wsi->truncated_send_malloc &&
+                               wsi->u.http.filepos == wsi->u.http.filelen) {
                        wsi->state = WSI_STATE_HTTP;
 
                        if (wsi->protocol->callback)
@@ -638,7 +674,7 @@ LWS_VISIBLE int libwebsockets_serve_http_file_fragment(
                }
        }
 
-       lwsl_notice("choked before able to send whole file (post)\n");
+       lwsl_info("choked before able to send whole file (post)\n");
        libwebsocket_callback_on_writable(context, wsi);
 
        return 0; /* indicates further processing must be done */
index 147253b..12b8d5a 100644 (file)
@@ -378,10 +378,6 @@ struct _lws_websocket_related {
        unsigned int this_frame_masked:1;
        unsigned int inside_frame:1; /* next write will be more of frame */
        unsigned int clean_buffer:1; /* buffer not rewritten by extension */
-       /* truncated send handling */
-       unsigned char *truncated_send_malloc; /* non-NULL means buffering in progress */
-       unsigned int truncated_send_offset; /* where we are in terms of spilling */
-       unsigned int truncated_send_len; /* how much is buffered */
 };
 
 struct libwebsocket {
@@ -415,6 +411,11 @@ struct libwebsocket {
        unsigned long latency_start;
 #endif
 
+       /* truncated send handling */
+       unsigned char *truncated_send_malloc; /* non-NULL means buffering in progress */
+       unsigned int truncated_send_offset; /* where we are in terms of spilling */
+       unsigned int truncated_send_len; /* how much is buffered */
+
        void *user_space;
 
        /* members with mutually exclusive lifetimes are unionized */
index 7c0421a..6c35c98 100644 (file)
@@ -145,6 +145,21 @@ int lws_server_socket_service(struct libwebsocket_context *context,
 
                /* handle http headers coming in */
 
+               /* pending truncated sends have uber priority */
+
+               if (wsi->truncated_send_malloc) {
+                       if (pollfd->revents & POLLOUT)
+                               lws_issue_raw(wsi, wsi->truncated_send_malloc +
+                                       wsi->truncated_send_offset,
+                                                       wsi->truncated_send_len);
+                       /*
+                        * we can't afford to allow input processing send
+                        * something new, so spin around he event loop until
+                        * he doesn't have any partials
+                        */
+                       break;
+               }
+
                /* any incoming data ready? */
 
                if (pollfd->revents & POLLIN) {
@@ -178,11 +193,17 @@ int lws_server_socket_service(struct libwebsocket_context *context,
                                return 0;
                        }
 
+                       /* hm this may want to send (via HTTP callback for example) */
+
                        n = libwebsocket_read(context, wsi,
                                                context->service_buffer, len);
                        if (n < 0)
                                /* we closed wsi */
                                return 0;
+
+                       /* hum he may have used up the writability above */
+
+                       break;
                }
 
                /* this handles POLLOUT for http serving fragments */
index 9ca5d21..2cdd67b 100644 (file)
@@ -377,7 +377,7 @@ static int callback_http(struct libwebsocket_context *context,
                                goto bail;
                        /* sent it all, close conn */
                        if (n == 0)
-                               goto bail;
+                               goto flush_bail;
                        /*
                         * because it's HTTP and not websocket, don't need to take
                         * care about pre and postamble
@@ -393,6 +393,12 @@ static int callback_http(struct libwebsocket_context *context,
                } while (!lws_send_pipe_choked(wsi));
                libwebsocket_callback_on_writable(context, wsi);
                break;
+flush_bail:
+               /* true if still partial pending */
+               if (lws_send_pipe_choked(wsi)) {
+                       libwebsocket_callback_on_writable(context, wsi);
+                       break;
+               }
 
 bail:
                close(pss->fd);