Fix tunnel to buffer incoming messages 72/308072/4
authorKrzysztof Malysa <k.malysa@samsung.com>
Fri, 15 Mar 2024 15:40:10 +0000 (16:40 +0100)
committerKrzysztof Jackiewicz <k.jackiewicz@samsung.com>
Tue, 19 Mar 2024 16:54:40 +0000 (16:54 +0000)
Previously there was an assumption that there will be no incoming
message unless we read from the tunnel. Now this is relaxed so that
incoming messages e.g. during writting are buffered and returned later
on invoking ReadBinary().

Change-Id: I78e31bb6f477747694575fe170873c2efffee74e

srcs/tunnel.cpp
srcs/tunnel.h
tests/tunnel/auto_tests.cpp

index 3b2c44d52f8ace5087931f903e6943ad1572b0d0..0c28a4a403e7ca2878c5fb4b9dcc40715e3bc540 100644 (file)
@@ -167,15 +167,13 @@ std::vector<uint8_t> Tunnel::ReadBinary()
     if (m_state != State::CONNECTED)
         THROW_INVALID_STATE("Not connected");
 
-    m_in.reserve(RX_BUFFER_SIZE);
-    m_in.clear();
-
-    m_state = State::READING;
-    while (!m_ws->Service(m_context) && m_state == State::READING) {
+    while (m_recvMsgs.empty() && !m_ws->Service(m_context) && m_state == State::CONNECTED) {
     }
-
     DisconnectOnError();
-    return std::move(m_in);
+    auto msg = std::move(m_recvMsgs.front());
+    m_recvMsgs.pop_front();
+    LogDebug("Read " << msg.size() << "B");
+    return msg;
 }
 
 void Tunnel::Disconnect()
@@ -261,8 +259,7 @@ bool Tunnel::HandleEvent(Lws *wsi, enum lws_callback_reasons reason, void *in, s
             LogDebug("Received " << len << "B, binary:" << binary << " first:" << first
                                  << " last:" << last);
 
-            // I assume that LWS_CALLBACK_CLIENT_RECEIVE won't arrive during WriteBinary servicing
-            if (m_state != State::READING) {
+            if (m_state != State::CONNECTED && m_state != State::WRITING) {
                 LogDebug("Unexpected data");
                 break;
             }
@@ -272,8 +269,11 @@ bool Tunnel::HandleEvent(Lws *wsi, enum lws_callback_reasons reason, void *in, s
                 break;
             }
 
-            if (first)
+            if (first) {
+                // Clear buffer before receiving data.
                 m_in.clear();
+                m_in.reserve(RX_BUFFER_SIZE);
+            }
 
             if (len > 0) {
                 if (!data) {
@@ -287,7 +287,7 @@ bool Tunnel::HandleEvent(Lws *wsi, enum lws_callback_reasons reason, void *in, s
             }
 
             if (last)
-                m_state = State::CONNECTED;
+                m_recvMsgs.emplace_back(std::move(m_in));
 
             break;
         }
index 21335334b03ebb4e95d331518ea9a4652006d0ce..a709ebb38c313cb6e4f5df79ed8815bdb09e1ea6 100644 (file)
@@ -19,6 +19,7 @@
 #include "websockets.h"
 
 #include <cstdint>
+#include <deque>
 #include <libwebsockets.h>
 #include <memory>
 #include <mutex>
@@ -63,7 +64,6 @@ public:
         FAILED,
         CONNECTED,
         WRITING,
-        READING,
     };
 
     bool
@@ -76,7 +76,9 @@ protected:
     std::shared_ptr<IWebsockets> m_ws;
     LwsContext *m_context;
     Lws *m_connection;
-    std::vector<uint8_t> m_in, m_out;
+    std::vector<uint8_t> m_in;
+    std::vector<uint8_t> m_out;
+    std::deque<std::vector<uint8_t>> m_recvMsgs;
     size_t m_writtenBytesNum;
     State m_state;
     bool m_cancelled;
index 21cd8749d627d24033d0e0da21aabd91145d63d8..2b383fbedd870d5d876d95632599cdac51e2c59d 100644 (file)
@@ -368,7 +368,7 @@ TEST(TunnelMockedTests, Cancelling)
             in = tunnel->ReadBinary();
             EXPECT_EQ(in, out);
             EXPECT_GE(eventNo,
-                      14); // we shouldn't get this far if cancel was called before 14th event
+                      13); // we shouldn't get this far if cancel was called before 13th event
 
             EXPECT_NO_THROW(tunnel->Disconnect());
         } catch (const Exception::Cancelled &) {
@@ -502,7 +502,7 @@ TEST(TunnelMockedTests, InjectedEvents)
                           std::vector<uint8_t> data = std::vector<uint8_t>()) override
         {
             auto &expectedEvent = EXPECTED_ORDER[m_eventNo];
-            ASSERT_EQ(reason, expectedEvent.reason);
+            ASSERT_EQ(reason, expectedEvent.reason) << "m_eventNo: " << m_eventNo;
 
             while (m_injectionsMapNextEventIdx < m_injectionsMap->size() &&
                    (*m_injectionsMap)[m_injectionsMapNextEventIdx].id == expectedEvent.id) {