typedef struct {
unsigned char *buf;
size_t len;
+ size_t accumulating;
+ bool last; // true, single message or last fragment of a message.
} websocket_data_s;
typedef struct {
int port, const char *iface_name, int max_conn, vine_dp_ssl ssl);
static void _request_write(websocket_s *ws);
-static int _save_data(websocket_s *ws, void *buf, size_t len);
+static int _save_data(websocket_s *ws, void *buf, size_t len, int first, int last);
static int _write_data(websocket_s *ws);
static void _debug_func(int level, const char *line)
g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_MOD, pollfd->fd, pollfd->events);
}
-// For debug
-static void __print_received_data(void *in, size_t len)
-{
- VINE_LOGD("len[%zd] received [%s]", len, (char *)in);
-}
-
static int _websocket_protocol_cb(struct lws *wsi,
enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
}
case LWS_CALLBACK_RECEIVE:
+ {
VINE_LOGI("%d bytes is received from client.", len);
- __print_received_data(in, len);
- if (ws && g_callbacks.received_cb) {
- _save_data(ws, in, len);
- g_callbacks.received_cb(len, ws->user);
- }
+ if (!ws)
+ return 0;
+
+ size_t total_len = _save_data(ws, in, len,
+ lws_is_first_fragment(wsi),
+ lws_is_final_fragment(wsi));
+ if (lws_is_final_fragment(wsi) && g_callbacks.received_cb)
+ g_callbacks.received_cb(total_len, ws->user);
break;
+ }
case LWS_CALLBACK_SERVER_WRITEABLE:
if (!ws) {
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
+ {
VINE_LOGI("%d bytes is received from server.", len);
- __print_received_data(in, len);
- if (ws && g_callbacks.received_cb) {
- _save_data(ws, in, len);
- g_callbacks.received_cb(len, ws->user);
- }
+ if (!ws)
+ return 0;
+
+ size_t total_len = _save_data(ws, in, len,
+ lws_is_first_fragment(wsi),
+ lws_is_final_fragment(wsi));
+ if (lws_is_final_fragment(wsi) && g_callbacks.received_cb)
+ g_callbacks.received_cb(total_len, ws->user);
break;
+ }
case LWS_CALLBACK_CLIENT_WRITEABLE:
if (!ws) {
{
RET_IF(wd == NULL, "data is NULL.");
free(wd->buf);
+ wd->buf = NULL;
free(wd);
}
-static int _save_data(websocket_s *ws, void *buf, size_t len)
+static int _save_data(websocket_s *ws, void *buf, size_t len, int first, int last)
{
websocket_data_s *wd = (websocket_data_s *)calloc(1, sizeof(websocket_data_s));
RET_VAL_IF(!wd, -1, "Out of memory.");
memcpy(data, buf, len);
wd->buf = data;
wd->len = len;
- ws->recv_buffer->push(wd);
- VINE_LOGD("websocket_data[%p] is pushed to recv_buffer.", wd);
- return len;
+ size_t accumulating = 0;
+ if (!first && !ws->recv_buffer->empty()) {
+ websocket_data_s *prior = ws->recv_buffer->back();
+ if (prior)
+ accumulating = prior->accumulating;
+ }
+ wd->accumulating = accumulating + len;
+ wd->last = last > 0;
+ ws->recv_buffer->push(wd);
+ VINE_LOGD("websocket_data[%p] is pushed to recv_buffer. total %zd bytes.",
+ wd, wd->accumulating);
+ return wd->accumulating;
}
static int _write_data(websocket_s *ws)
if (!ws->recv_buffer)
return bytes;
- wd = ws->recv_buffer->front();
- RET_VAL_IF(wd == NULL, 0, "There is no data.");
+ unsigned char *ptr = buf;
+ size_t read_len = 0;
+ while (!ws->recv_buffer->empty() && len > 0) {
+ wd = ws->recv_buffer->front();
+ if (wd == nullptr) {
+ ws->recv_buffer->erase();
+ return bytes;
+ }
- if (wd->len <= len) {
- memcpy(buf, wd->buf, wd->len);
- bytes = wd->len;
- wd = ws->recv_buffer->pop();
- __destroy_websocket_data(wd);
- wd = NULL;
- } else {
- memcpy(buf, wd->buf, len);
- bytes = len;
-
- // Requeue a remained data ahead of the items in recv_buffer.
- // It might be read later.
- size_t remained_len = wd->len - len;
- unsigned char *remained_data = (unsigned char *)calloc(remained_len, sizeof(unsigned char));
- memcpy(remained_data, wd->buf + len, remained_len);
- free(wd->buf);
- wd->buf = remained_data;
- wd->len = remained_len;
+ ptr += read_len;
+ read_len = wd->len > len ? len : wd->len;
+ memcpy(ptr, wd->buf, read_len);
+ len -= read_len;
+ bytes += read_len;
+
+ if (read_len >= wd->len) {
+ ws->recv_buffer->erase();
+ bool last = wd->last;
+ __destroy_websocket_data(wd);
+ wd = NULL;
+ if (last)
+ break;
+ } else if (read_len < wd->len) { // requeue
+ size_t remained_len = wd->len - read_len;
+ unsigned char *remained_data = (unsigned char *)calloc(remained_len,
+ sizeof(unsigned char));
+ memcpy(remained_data, wd->buf + read_len, remained_len);
+ free(wd->buf);
+ wd->buf = remained_data;
+ wd->len = remained_len;
+ return bytes;
+ }
}
return bytes;