dist_libwebsockets_la_SOURCES += md5.c sha-1.c
endif
-libwebsockets_la_CFLAGS=-Wall -std=gnu99 -pedantic
+libwebsockets_la_CFLAGS=-Wall -std=gnu99 -pedantic -g
libwebsockets_la_LDFLAGS=
if MINGW
* screwed.. close the connection... we will get a
* destroy callback to take care of closing nicely
*/
- fprintf(stderr, "zlib error inflate %d: %s",
+ lwsl_err("zlib error inflate %d: %s\n",
n, conn->zs_in.msg);
return -1;
}
if (wsi->c_address)
free(wsi->c_address);
+ if (wsi->rxflow_buffer)
+ free(wsi->rxflow_buffer);
+
/* lwsl_info("closing fd=%d\n", wsi->sock); */
#ifdef LWS_OPENSSL_SUPPORT
else
n = LWS_CALLBACK_SERVER_WRITEABLE;
- wsi->protocol->callback(context, wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0);
+ user_callback_handle_rxflow(wsi->protocol->callback, context,
+ wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0);
return 0;
}
LWS_CLOSE_STATUS_NOSTATUS);
else
if (wsi->state == WSI_STATE_HTTP && wsi->protocol->callback)
- if (wsi->protocol->callback(context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space,
+ if (user_callback_handle_rxflow(wsi->protocol->callback, context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space,
wsi->filepath, wsi->filepos))
libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS);
break;
/* broadcast it to this connection */
- new_wsi->protocol->callback(context, new_wsi,
+ user_callback_handle_rxflow(new_wsi->protocol->callback, context, new_wsi,
LWS_CALLBACK_BROADCAST,
new_wsi->user_space,
buf + LWS_SEND_BUFFER_PRE_PADDING, len);
return wsi->sock;
}
-/**
- * libwebsocket_rx_flow_control() - Enable and disable socket servicing for
- * receieved packets.
- *
- * If the output side of a server process becomes choked, this allows flow
- * control for the input side.
- *
- * @wsi: Websocket connection instance to get callback for
- * @enable: 0 = disable read servicing for this connection, 1 = enable
- */
int
-libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
+_libwebsocket_rx_flow_control(struct libwebsocket *wsi)
{
struct libwebsocket_context *context = wsi->protocol->owning_server;
int n;
- for (n = 0; n < context->fds_count; n++)
- if (context->fds[n].fd == wsi->sock) {
- if (enable)
- context->fds[n].events |= POLLIN;
- else
- context->fds[n].events &= ~POLLIN;
+ if (!(wsi->rxflow_change_to & 2))
+ return 0;
- return 0;
+ wsi->rxflow_change_to &= ~2;
+
+ lwsl_info("rxflow: wsi %p change_to %d\n", wsi, wsi->rxflow_change_to);
+
+ /* if we're letting it come again, did we interrupt anything? */
+ if ((wsi->rxflow_change_to & 1) && wsi->rxflow_buffer) {
+ n = libwebsocket_interpret_incoming_packet(wsi, NULL, 0);
+ if (n < 0) {
+ libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS);
+ return -1;
}
+ if (n)
+ /* oh he stuck again, do nothing */
+ return 0;
+ }
- if (enable)
+ if (wsi->rxflow_change_to & 1)
+ context->fds[wsi->position_in_fds_table].events |= POLLIN;
+ else
+ context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
+
+ if (wsi->rxflow_change_to & 1)
/* external POLL support via protocol 0 */
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_SET_MODE_POLL_FD,
LWS_CALLBACK_CLEAR_MODE_POLL_FD,
(void *)(long)wsi->sock, NULL, POLLIN);
-#if 0
- lwsl_err("libwebsocket_rx_flow_control unable to find socket\n");
-#endif
return 1;
}
/**
+ * libwebsocket_rx_flow_control() - Enable and disable socket servicing for
+ * receieved packets.
+ *
+ * If the output side of a server process becomes choked, this allows flow
+ * control for the input side.
+ *
+ * @wsi: Websocket connection instance to get callback for
+ * @enable: 0 = disable read servicing for this connection, 1 = enable
+ */
+
+int
+libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
+{
+ wsi->rxflow_change_to = 2 | !!enable;
+
+ return 0;
+}
+
+
+/**
* libwebsocket_canonical_hostname() - returns this host's hostname
*
* This is typically used by client code to fill in the host parameter
}
#endif
+int user_callback_handle_rxflow(callback_function callback_function,
+ struct libwebsocket_context * context,
+ struct libwebsocket *wsi,
+ enum libwebsocket_callback_reasons reason, void *user,
+ void *in, size_t len)
+{
+ int n;
+
+ n = callback_function(context, wsi, reason, user, in, len);
+ if (n < 0)
+ return n;
+
+ _libwebsocket_rx_flow_control(wsi);
+
+ return 0;
+}
+
/**
* libwebsocket_create_context() - Create the websocket handler
if (wsi->protocol != protocol)
continue;
- wsi->protocol->callback(context, wsi,
+ user_callback_handle_rxflow(wsi->protocol->callback,
+ context, wsi,
LWS_CALLBACK_BROADCAST,
wsi->user_space,
buf, len);
break;
case LWS_RXPS_EAT_UNTIL_76_FF:
+
if (c == 0xff) {
wsi->lws_rx_parse_state = LWS_RXPS_NEW;
goto issue;
break;
issue:
if (wsi->protocol->callback)
- wsi->protocol->callback(wsi->protocol->owning_server,
+ user_callback_handle_rxflow(wsi->protocol->callback,
+ wsi->protocol->owning_server,
wsi, LWS_CALLBACK_RECEIVE,
wsi->user_space,
&wsi->rx_user_buffer[LWS_SEND_BUFFER_PRE_PADDING],
eff_buf.token[eff_buf.token_len] = '\0';
if (wsi->protocol->callback)
- wsi->protocol->callback(wsi->protocol->owning_server,
+ user_callback_handle_rxflow(wsi->protocol->callback,
+ wsi->protocol->owning_server,
wsi, LWS_CALLBACK_RECEIVE,
wsi->user_space,
eff_buf.token,
unsigned char *buf, size_t len)
{
size_t n;
+ int m;
+ int clear_rxflow = !!wsi->rxflow_buffer;
+ struct libwebsocket_context *context = wsi->protocol->owning_server;
#ifdef DEBUG
lwsl_parser("received %d byte packet\n", (int)len);
lwsl_hexdump(buf, len);
#endif
+ if (buf && wsi->rxflow_buffer)
+ lwsl_err("!!!! libwebsocket_interpret_incoming_packet: was pending rxflow, data loss\n");
+
/* let the rx protocol state machine have as much as it needs */
n = 0;
- while (n < len)
- if (libwebsocket_rx_sm(wsi, buf[n++]) < 0)
+ if (!buf) {
+ lwsl_info("dumping stored rxflow buffer len %d pos=%d\n", wsi->rxflow_len, wsi->rxflow_pos);
+ buf = wsi->rxflow_buffer;
+ n = wsi->rxflow_pos;
+ len = wsi->rxflow_len;
+ /* let's pretend he's already allowing input */
+ context->fds[wsi->position_in_fds_table].events |= POLLIN;
+ }
+
+ while (n < len) {
+ if (!(context->fds[wsi->position_in_fds_table].events & POLLIN)) {
+ /* his RX is flowcontrolled */
+ if (!wsi->rxflow_buffer) { /* a new rxflow in effect, buffer it and warn caller */
+ lwsl_info("new rxflow input buffer len %d\n", len - n);
+ wsi->rxflow_buffer = (unsigned char *)malloc(len - n);
+ wsi->rxflow_len = len - n;
+ wsi->rxflow_pos = 0;
+ memcpy(wsi->rxflow_buffer, buf + n, len - n);
+ } else {
+ lwsl_info("re-using rxflow input buffer\n");
+ /* rxflow while we were spilling previous rxflow buffer */
+ wsi->rxflow_pos = n;
+ }
+ return 1;
+ }
+ m = libwebsocket_rx_sm(wsi, buf[n]);
+ if (m < 0)
return -1;
+ n++;
+ }
+
+ if (clear_rxflow) {
+ lwsl_info("flow: clearing it\n");
+ free(wsi->rxflow_buffer);
+ wsi->rxflow_buffer = NULL;
+ context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
+ }
return 0;
}
int sock;
int position_in_fds_table;
+ unsigned char *rxflow_buffer;
+ int rxflow_len;
+ int rxflow_pos;
+ int rxflow_change_to;
enum lws_rx_parse_state lws_rx_parse_state;
char extension_data_pending;
lws_issue_raw_ext_access(struct libwebsocket *wsi,
unsigned char *buf, size_t len);
+extern int
+_libwebsocket_rx_flow_control(struct libwebsocket *wsi);
+
+extern int
+user_callback_handle_rxflow(callback_function, struct libwebsocket_context * context,
+ struct libwebsocket *wsi,
+ enum libwebsocket_callback_reasons reason, void *user,
+ void *in, size_t len);
+
#ifndef LWS_OPENSSL_SUPPORT
unsigned char *
static struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
static int ringbuffer_head;
+static struct libwebsocket *wsi_choked[20];
+static int num_wsi_choked;
static int
callback_lws_mirror(struct libwebsocket_context *context,
LWS_WRITE_TEXT);
if (n < 0) {
fprintf(stderr, "ERROR %d writing to socket\n", n);
- exit(1);
+ return 1;
}
if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
else
pss->ringbuffer_tail++;
- if (((ringbuffer_head - pss->ringbuffer_tail) %
- MAX_MESSAGE_QUEUE) < (MAX_MESSAGE_QUEUE - 15))
- libwebsocket_rx_flow_control(wsi, 1);
+ if (((ringbuffer_head - pss->ringbuffer_tail) &
+ (MAX_MESSAGE_QUEUE - 1)) < (MAX_MESSAGE_QUEUE - 15)) {
+ for (n = 0; n < num_wsi_choked; n++)
+ libwebsocket_rx_flow_control(wsi_choked[n], 1);
+ num_wsi_choked = 0;
+ }
+
+// fprintf(stderr, "tx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1));
libwebsocket_callback_on_writable(context, wsi);
case LWS_CALLBACK_RECEIVE:
+ if (((ringbuffer_head - pss->ringbuffer_tail) &
+ (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) {
+ fprintf(stderr, "dropping!\n");
+ goto choke;
+ }
+
if (ringbuffer[ringbuffer_head].payload)
free(ringbuffer[ringbuffer_head].payload);
else
ringbuffer_head++;
- if (((ringbuffer_head - pss->ringbuffer_tail) %
- MAX_MESSAGE_QUEUE) > (MAX_MESSAGE_QUEUE - 10))
+ if (((ringbuffer_head - pss->ringbuffer_tail) &
+ (MAX_MESSAGE_QUEUE - 1)) < (MAX_MESSAGE_QUEUE - 10))
+ goto done;
+
+choke:
+ if (num_wsi_choked < sizeof wsi_choked / sizeof wsi_choked[0]) {
libwebsocket_rx_flow_control(wsi, 0);
+ wsi_choked[num_wsi_choked++] = wsi;
+ }
+// fprintf(stderr, "rx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1));
+done:
libwebsocket_callback_on_writable_all_protocol(
libwebsockets_get_protocol(wsi));
break;
+
/*
* this just demonstrates how to use the protocol filter. If you won't
* study and reject connections based on header content, you don't need