Invoke recv_cb() when all fragments of a message is arrived 17/258317/5
authorSeonah Moon <seonah1.moon@samsung.com>
Thu, 13 May 2021 10:41:45 +0000 (19:41 +0900)
committerSeonah Moon <seonah1.moon@samsung.com>
Fri, 14 May 2021 07:49:25 +0000 (16:49 +0900)
Change-Id: Ic5b01ee73c771503315e478b60e6d45f8aab9de9

plugins/libwebsockets/libwebsockets-plugin.cpp
src/include/vine-queue.h

index 9ae9149fd7bb2a51d25973201bbf7c263914e1aa..a9918a46c2b3567569fb097179abb1b488091fdf 100755 (executable)
@@ -43,6 +43,8 @@ using namespace std;
 typedef struct {
        unsigned char *buf;
        size_t len;
+       size_t accumulating;
+       bool last; // true, single message or last fragment of a message.
 } websocket_data_s;
 
 typedef struct {
@@ -112,7 +114,7 @@ static void _open_server(websocket_s *ws, int addr_family,
                int port, const char *iface_name, int max_conn, vine_dp_ssl ssl);
 static void _request_write(websocket_s *ws);
 
-static int _save_data(websocket_s *ws, void *buf, size_t len);
+static int _save_data(websocket_s *ws, void *buf, size_t len, int first, int last);
 static int _write_data(websocket_s *ws);
 
 static void _debug_func(int level, const char *line)
@@ -327,12 +329,6 @@ static void _change_websocket_poll_fd(struct lws_pollargs *args)
                g_callbacks.pollfd_cb(VINE_DATA_PATH_POLLFD_MOD, pollfd->fd, pollfd->events);
 }
 
-// For debug
-static void __print_received_data(void *in, size_t len)
-{
-       VINE_LOGD("len[%zd] received [%s]", len, (char *)in);
-}
-
 static int _websocket_protocol_cb(struct lws *wsi,
                enum lws_callback_reasons reason, void *user, void *in, size_t len)
 {
@@ -437,13 +433,18 @@ static int _websocket_protocol_cb(struct lws *wsi,
        }
 
        case LWS_CALLBACK_RECEIVE:
+       {
                VINE_LOGI("%d bytes is received from client.", len);
-               __print_received_data(in, len);
-               if (ws && g_callbacks.received_cb) {
-                       _save_data(ws, in, len);
-                       g_callbacks.received_cb(len, ws->user);
-               }
+               if (!ws)
+                       return 0;
+
+               size_t total_len = _save_data(ws, in, len,
+                               lws_is_first_fragment(wsi),
+                               lws_is_final_fragment(wsi));
+               if (lws_is_final_fragment(wsi) && g_callbacks.received_cb)
+                       g_callbacks.received_cb(total_len, ws->user);
                break;
+       }
 
        case LWS_CALLBACK_SERVER_WRITEABLE:
                if (!ws) {
@@ -505,13 +506,18 @@ static int _websocket_protocol_cb(struct lws *wsi,
                break;
 
        case LWS_CALLBACK_CLIENT_RECEIVE:
+       {
                VINE_LOGI("%d bytes is received from server.", len);
-               __print_received_data(in, len);
-               if (ws && g_callbacks.received_cb) {
-                       _save_data(ws, in, len);
-                       g_callbacks.received_cb(len, ws->user);
-               }
+               if (!ws)
+                       return 0;
+
+               size_t total_len = _save_data(ws, in, len,
+                               lws_is_first_fragment(wsi),
+                               lws_is_final_fragment(wsi));
+               if (lws_is_final_fragment(wsi) && g_callbacks.received_cb)
+                       g_callbacks.received_cb(total_len, ws->user);
                break;
+       }
 
        case LWS_CALLBACK_CLIENT_WRITEABLE:
                if (!ws) {
@@ -720,10 +726,11 @@ static void __destroy_websocket_data(websocket_data_s *wd)
 {
        RET_IF(wd == NULL, "data is NULL.");
        free(wd->buf);
+       wd->buf = NULL;
        free(wd);
 }
 
-static int _save_data(websocket_s *ws, void *buf, size_t len)
+static int _save_data(websocket_s *ws, void *buf, size_t len, int first, int last)
 {
        websocket_data_s *wd = (websocket_data_s *)calloc(1, sizeof(websocket_data_s));
        RET_VAL_IF(!wd, -1, "Out of memory.");
@@ -737,10 +744,19 @@ static int _save_data(websocket_s *ws, void *buf, size_t len)
        memcpy(data, buf, len);
        wd->buf = data;
        wd->len = len;
-       ws->recv_buffer->push(wd);
-       VINE_LOGD("websocket_data[%p] is pushed to recv_buffer.", wd);
 
-       return len;
+       size_t accumulating = 0;
+       if (!first && !ws->recv_buffer->empty()) {
+               websocket_data_s *prior = ws->recv_buffer->back();
+               if (prior)
+                       accumulating = prior->accumulating;
+       }
+       wd->accumulating = accumulating + len;
+       wd->last = last > 0;
+       ws->recv_buffer->push(wd);
+       VINE_LOGD("websocket_data[%p] is pushed to recv_buffer. total %zd bytes.",
+                       wd, wd->accumulating);
+       return wd->accumulating;
 }
 
 static int _write_data(websocket_s *ws)
@@ -789,27 +805,38 @@ static int websocket_read(vine_dp_plugin_h handle, unsigned char *buf, size_t le
        if (!ws->recv_buffer)
                return bytes;
 
-       wd = ws->recv_buffer->front();
-       RET_VAL_IF(wd == NULL, 0, "There is no data.");
+       unsigned char *ptr = buf;
+       size_t read_len = 0;
+       while (!ws->recv_buffer->empty() && len > 0) {
+               wd = ws->recv_buffer->front();
+               if (wd == nullptr) {
+                       ws->recv_buffer->erase();
+                       return bytes;
+               }
 
-       if (wd->len <= len) {
-               memcpy(buf, wd->buf, wd->len);
-               bytes = wd->len;
-               wd = ws->recv_buffer->pop();
-               __destroy_websocket_data(wd);
-               wd = NULL;
-       } else {
-               memcpy(buf, wd->buf, len);
-               bytes = len;
-
-               // Requeue a remained data ahead of the items in recv_buffer.
-               // It might be read later.
-               size_t remained_len = wd->len - len;
-               unsigned char *remained_data = (unsigned char *)calloc(remained_len, sizeof(unsigned char));
-               memcpy(remained_data, wd->buf + len, remained_len);
-               free(wd->buf);
-               wd->buf = remained_data;
-               wd->len = remained_len;
+               ptr += read_len;
+               read_len = wd->len > len ? len : wd->len;
+               memcpy(ptr, wd->buf, read_len);
+               len -= read_len;
+               bytes += read_len;
+
+               if (read_len >= wd->len) {
+                       ws->recv_buffer->erase();
+                       bool last = wd->last;
+                       __destroy_websocket_data(wd);
+                       wd = NULL;
+                       if (last)
+                               break;
+               } else if (read_len < wd->len) { // requeue
+                       size_t remained_len = wd->len - read_len;
+                       unsigned char *remained_data = (unsigned char *)calloc(remained_len,
+                                       sizeof(unsigned char));
+                       memcpy(remained_data, wd->buf + read_len, remained_len);
+                       free(wd->buf);
+                       wd->buf = remained_data;
+                       wd->len = remained_len;
+                       return bytes;
+               }
        }
 
        return bytes;
index e16312763157eab66e40165183e45b7608d2dce3..49963943f8e56bbf3a88c602016b14bd4830da40 100755 (executable)
@@ -63,12 +63,24 @@ public:
                return _queue.front();
        }
 
+       T &back()
+       {
+               std::lock_guard<std::mutex> lock_guard(_q_mutex);
+               return _queue.back();
+       }
+
        size_t size()
        {
                std::lock_guard<std::mutex> lock_guard(_q_mutex);
                return _queue.size();
        }
 
+       bool empty()
+       {
+               std::lock_guard<std::mutex> lock_guard(_q_mutex);
+               return _queue.empty();
+       }
+
 private:
        std::queue<T> _queue;
        std::mutex _q_mutex;