callback each active extension on packet tx pre send
authorAndy Green <andy@warmcat.com>
Sun, 6 Mar 2011 13:14:42 +0000 (13:14 +0000)
committerAndy Green <andy@warmcat.com>
Sun, 6 Mar 2011 13:14:42 +0000 (13:14 +0000)
Signed-off-by: Andy Green <andy@warmcat.com>
lib/libwebsockets.c
lib/libwebsockets.h
lib/parsers.c
lib/private-libwebsockets.h

index a8f2cf1..ec71d79 100644 (file)
@@ -413,6 +413,111 @@ int lws_send_pipe_choked(struct libwebsocket *wsi)
        return 0;
 }
 
+static int
+lws_handle_POLLOUT_event(struct libwebsocket_context *context,
+                               struct libwebsocket *wsi, struct pollfd *pollfd)
+{
+       struct lws_tokens eff_buf;
+       int n;
+       int ret;
+       int m;
+
+       if (!wsi->extension_data_pending)
+               goto user_service;
+
+       /*
+        * check in on the active extensions, see if they
+        * had pending stuff to spill... they need to get the
+        * first look-in otherwise sequence will be disordered
+        *
+        * NULL, zero-length eff_buf means just spill pending
+        */
+
+       ret = 1;
+       while (ret == 1) {
+
+               /* default to nobody has more to spill */
+
+               ret = 0;
+               eff_buf.token = NULL;
+               eff_buf.token_len = 0;
+
+               /* give every extension a chance to spill */
+
+               for (n = 0; n < wsi->count_active_extensions; n++) {
+                       m = wsi->active_extensions[n]->callback(
+                               wsi->protocol->owning_server, wsi,
+                                       LWS_EXT_CALLBACK_PACKET_TX_PRESEND,
+                                  wsi->active_extensions_user[n], &eff_buf, 0);
+                       if (m < 0) {
+                               fprintf(stderr, "extension reports fatal error\n");
+                               return -1;
+                       }
+                       if (m)
+                               /*
+                                * at least one extension told us he has more
+                                * to spill, so we will go around again after
+                                */
+                               ret = 1;
+               }
+
+               /* assuming they gave us something to send, send it */
+
+               if (eff_buf.token_len) {
+                       if (lws_issue_raw(wsi, (unsigned char *)eff_buf.token,
+                                                            eff_buf.token_len))
+                               return -1;
+               } else
+                       continue;
+
+               /* no extension has more to spill */
+
+               if (!ret)
+                       continue;
+
+               /*
+                * There's more to spill from an extension, but we just sent
+                * something... did that leave the pipe choked?
+                */
+
+               if (!lws_send_pipe_choked(wsi))
+                       /* no we could add more */
+                       continue;
+
+               fprintf(stderr, "choked in POLLOUT service\n");
+
+               /*
+                * Yes, he's choked.  Leave the POLLOUT masked on so we will
+                * come back here when he is unchoked.  Don't call the user
+                * callback to enforce ordering of spilling, he'll get called
+                * when we come back here and there's nothing more to spill.
+                */
+
+               return 0;
+       }
+
+       wsi->extension_data_pending = 0;
+
+user_service:
+       /* one shot */
+
+       pollfd->events &= ~POLLOUT;
+
+       /* external POLL support via protocol 0 */
+       context->protocols[0].callback(context, wsi,
+               LWS_CALLBACK_CLEAR_MODE_POLL_FD,
+               (void *)(long)wsi->sock, NULL, POLLOUT);
+
+       wsi->protocol->callback(context, wsi,
+               LWS_CALLBACK_CLIENT_WRITEABLE,
+               wsi->user_space,
+               NULL, 0);
+
+       return 0;
+}
+
+
+
 /**
  * libwebsocket_service_fd() - Service polled socket with something waiting
  * @context:   Websocket context
@@ -428,7 +533,7 @@ int
 libwebsocket_service_fd(struct libwebsocket_context *context,
                                                          struct pollfd *pollfd)
 {
-       unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + MAX_BROADCAST_PAYLOAD +
+       unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 1 + MAX_BROADCAST_PAYLOAD +
                                                  LWS_SEND_BUFFER_POST_PADDING];
        struct libwebsocket *wsi;
        struct libwebsocket *new_wsi;
@@ -712,24 +817,17 @@ libwebsocket_service_fd(struct libwebsocket_context *context,
                        return 1;
                }
 
-               /* the guy requested a callback when it was OK to write */
-
-               if (pollfd->revents & POLLOUT) {
-
-                       /* one shot */
-
-                       pollfd->events &= ~POLLOUT;
-
-                       /* external POLL support via protocol 0 */
-                       context->protocols[0].callback(context, wsi,
-                               LWS_CALLBACK_CLEAR_MODE_POLL_FD,
-                               (void *)(long)wsi->sock, NULL, POLLOUT);
+               /*
+                * either extension code with stuff to spill, or the user code,
+                * requested a callback when it was OK to write
+                */
 
-                       wsi->protocol->callback(context, wsi,
-                               LWS_CALLBACK_CLIENT_WRITEABLE,
-                               wsi->user_space,
-                               NULL, 0);
-               }
+               if (pollfd->revents & POLLOUT)
+                       if (lws_handle_POLLOUT_event(context, wsi, pollfd) < 0) {
+                               libwebsocket_close_and_free_session(context, wsi,
+                                                      LWS_CLOSE_STATUS_NORMAL);
+                               return 1;
+                       }
 
                /* any incoming data ready? */
 
@@ -1397,20 +1495,13 @@ bail2:
 
                /* the guy requested a callback when it was OK to write */
 
-               if (pollfd->revents & POLLOUT) {
-
-                       pollfd->events &= ~POLLOUT;
-
-                       /* external POLL support via protocol 0 */
-                       context->protocols[0].callback(context, wsi,
-                               LWS_CALLBACK_CLEAR_MODE_POLL_FD,
-                               (void *)(long)wsi->sock, NULL, POLLOUT);
+               if (pollfd->revents & POLLOUT)
+                       if (lws_handle_POLLOUT_event(context, wsi, pollfd) < 0) {
+                               libwebsocket_close_and_free_session(context, wsi,
+                                                      LWS_CLOSE_STATUS_NORMAL);
+                               return 1;
+                       }
 
-                       wsi->protocol->callback(context, wsi,
-                               LWS_CALLBACK_CLIENT_WRITEABLE,
-                               wsi->user_space,
-                               NULL, 0);
-               }
 
                /* any incoming data ready? */
 
index a9627ad..b759590 100644 (file)
@@ -75,6 +75,7 @@ enum libwebsocket_extension_callback_reasons {
        LWS_EXT_CALLBACK_CONSTRUCT,
        LWS_EXT_CALLBACK_DESTROY,
        LWS_EXT_CALLBACK_PACKET_RX_PREPARSE,
+       LWS_EXT_CALLBACK_PACKET_TX_PRESEND,
 };
 
 enum libwebsocket_write_protocol {
index 37a4ae6..e4f17a4 100644 (file)
@@ -1109,9 +1109,12 @@ int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
                          size_t len, enum libwebsocket_write_protocol protocol)
 {
        int n;
+       int m;
        int pre = 0;
        int post = 0;
        unsigned int shift = 7;
+       struct lws_tokens eff_buf;
+       int ret;
 
        if (len == 0 && protocol != LWS_WRITE_CLOSE) {
                fprintf(stderr, "zero length libwebsocket_write attempt\n");
@@ -1326,10 +1329,95 @@ int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
 
 send_raw:
 
-       if (lws_issue_raw(wsi, buf - pre, len + pre + post))
-               return -1;
+       if (protocol == LWS_WRITE_HTTP) {
+               if (lws_issue_raw(wsi, (unsigned char *)buf - pre,
+                                                             len + pre + post))
+                       return -1;
+
+               return 0;
+       }
+
+       /*
+        * give any active extensions a chance to munge the buffer
+        * before send.  We pass in a pointer to an lws_tokens struct
+        * prepared with the default buffer and content length that's in
+        * there.  Rather than rewrite the default buffer, extensions
+        * that expect to grow the buffer can adapt .token to
+        * point to their own per-connection buffer in the extension
+        * user allocation.  By default with no extensions or no
+        * extension callback handling, just the normal input buffer is
+        * used then so it is efficient.
+        *
+        * callback returns 1 in case it wants to spill more buffers
+        */
+
+       eff_buf.token = (char *)buf - pre;
+       eff_buf.token_len = len + pre + post;
+
+       /*
+        * while we have original buf to spill ourselves, or extensions report
+        * more in their pipeline
+        */
+
+       ret = 1;
+       while (ret == 1) {
+
+               /* default to nobody has more to spill */
+
+               ret = 0;
+
+               /* show every extension the new incoming data */
+
+               for (n = 0; n < wsi->count_active_extensions; n++) {
+                       m = wsi->active_extensions[n]->callback(
+                               wsi->protocol->owning_server, wsi,
+                                       LWS_EXT_CALLBACK_PACKET_TX_PRESEND,
+                                  wsi->active_extensions_user[n], &eff_buf, 0);
+                       if (m < 0) {
+                               fprintf(stderr, "Extension reports fatal error\n");
+                               return -1;
+                       }
+                       if (m)
+                               /*
+                                * at least one extension told us he has more
+                                * to spill, so we will go around again after
+                                */
+                               ret = 1;
+               }
+
+               /* assuming they left us something to send, send it */
+
+               if (eff_buf.token_len)
+                       if (lws_issue_raw(wsi, (unsigned char *)eff_buf.token,
+                                                            eff_buf.token_len))
+                               return -1;
+
+               /* we used up what we had */
+
+               eff_buf.token = NULL;
+               eff_buf.token_len = 0;
+
+               /*
+                * Did that leave the pipe choked?
+                */
+
+               if (!lws_send_pipe_choked(wsi))
+                       /* no we could add more */
+                       continue;
+
+               fprintf(stderr, "choked\n");
+
+               /*
+                * Yes, he's choked.  Don't spill the rest now get a callback
+                * when he is ready to send and take care of it there
+                */
+               libwebsocket_callback_on_writable(
+                                            wsi->protocol->owning_server, wsi);
+               wsi->extension_data_pending = 1;
+               ret = 0;
+       }
 
-       debug("written %d bytes to client\n", (int)len);
+       debug("written %d bytes to client\n", eff_buf.token_len);
 
        return 0;
 }
index 9866871..fa61f49 100644 (file)
@@ -248,6 +248,7 @@ struct libwebsocket {
        int sock;
 
        enum lws_rx_parse_state lws_rx_parse_state;
+       char extension_data_pending;
 
        /* 04 protocol specific */