if (m_state != State::CONNECTED)
THROW_INVALID_STATE("Not connected");
- m_in.reserve(RX_BUFFER_SIZE);
- m_in.clear();
-
- m_state = State::READING;
- while (!m_ws->Service(m_context) && m_state == State::READING) {
+ while (m_recvMsgs.empty() && !m_ws->Service(m_context) && m_state == State::CONNECTED) {
}
-
DisconnectOnError();
- return std::move(m_in);
+ auto msg = std::move(m_recvMsgs.front());
+ m_recvMsgs.pop_front();
+ LogDebug("Read " << msg.size() << "B");
+ return msg;
}
void Tunnel::Disconnect()
LogDebug("Received " << len << "B, binary:" << binary << " first:" << first
<< " last:" << last);
- // I assume that LWS_CALLBACK_CLIENT_RECEIVE won't arrive during WriteBinary servicing
- if (m_state != State::READING) {
+ if (m_state != State::CONNECTED && m_state != State::WRITING) {
LogDebug("Unexpected data");
break;
}
break;
}
- if (first)
+ if (first) {
+ // Clear buffer before receiving data.
m_in.clear();
+ m_in.reserve(RX_BUFFER_SIZE);
+ }
if (len > 0) {
if (!data) {
}
if (last)
- m_state = State::CONNECTED;
+ m_recvMsgs.emplace_back(std::move(m_in));
break;
}
#include "websockets.h"
#include <cstdint>
+#include <deque>
#include <libwebsockets.h>
#include <memory>
#include <mutex>
FAILED,
CONNECTED,
WRITING,
- READING,
};
bool
std::shared_ptr<IWebsockets> m_ws;
LwsContext *m_context;
Lws *m_connection;
- std::vector<uint8_t> m_in, m_out;
+ std::vector<uint8_t> m_in;
+ std::vector<uint8_t> m_out;
+ std::deque<std::vector<uint8_t>> m_recvMsgs;
size_t m_writtenBytesNum;
State m_state;
bool m_cancelled;
in = tunnel->ReadBinary();
EXPECT_EQ(in, out);
EXPECT_GE(eventNo,
- 14); // we shouldn't get this far if cancel was called before 14th event
+ 13); // we shouldn't get this far if cancel was called before 13th event
EXPECT_NO_THROW(tunnel->Disconnect());
} catch (const Exception::Cancelled &) {
std::vector<uint8_t> data = std::vector<uint8_t>()) override
{
auto &expectedEvent = EXPECTED_ORDER[m_eventNo];
- ASSERT_EQ(reason, expectedEvent.reason);
+ ASSERT_EQ(reason, expectedEvent.reason) << "m_eventNo: " << m_eventNo;
while (m_injectionsMapNextEventIdx < m_injectionsMap->size() &&
(*m_injectionsMap)[m_injectionsMapNextEventIdx].id == expectedEvent.id) {