Transfer exact sizes of topics through TCP protocol.
authorChanhee Lee <ch2102.lee@samsung.com>
Tue, 23 Aug 2022 04:47:09 +0000 (13:47 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Thu, 15 Sep 2022 05:30:03 +0000 (14:30 +0900)
[Problem] Topic data are now sent or received only once without
          considerations to the sizes actually transferred.
[Solution] Iteratively send or receive the topic until the exact
           sizes are transferred.

modules/tcp/Module.cc
modules/tcp/Module.h

index b9b6774..929a2ee 100644 (file)
@@ -131,37 +131,43 @@ void Module::Publish(const std::string &topic, const void *data, const size_t da
 
 void Module::SendTopic(const std::string &topic, Module::PortMap::iterator &portIt)
 {
-    uint32_t topicLen = topic.length();
-    size_t szData = sizeof(topicLen);
-    portIt->second->Send(static_cast<void *>(&topicLen), szData);
-    szData = topicLen;
-    portIt->second->Send(static_cast<const void *>(topic.c_str()), szData);
+    size_t topic_length = topic.length();
+    SendExactSize(portIt, static_cast<const void *>(&topic_length), sizeof(topic_length));
+
+    SendExactSize(portIt, static_cast<const void *>(topic.c_str()), topic_length);
 }
 
 void Module::SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data)
 {
-    uint32_t sendsize = datalen;
-    size_t szsize = sizeof(sendsize);
+    size_t payload_size = datalen;
+    if (0 == datalen) {
+        // distinguish between connection problems and zero-size messages
+        INFO("Send a zero-size message.");
+        payload_size = UINT32_MAX;
+    }
 
     try {
-        if (0 == datalen) {
-            // distinguish between connection problems and zero-size messages
-            INFO("Send zero-size Message");
-            sendsize = UINT32_MAX;
-        }
-        portIt->second->Send(static_cast<void *>(&sendsize), szsize);
-
-        int msgSize = datalen;
-        while (0 < msgSize) {
-            size_t sentSize = msgSize;
-            char *dataIdx = (char *)data + (sendsize - msgSize);
-            portIt->second->Send(dataIdx, sentSize);
-            if (sentSize > 0) {
-                msgSize -= sentSize;
-            }
-        }
+        SendExactSize(portIt, static_cast<void *>(&payload_size), sizeof(payload_size));
+
+        SendExactSize(portIt, data, datalen);
     } catch (std::exception &e) {
-        ERR("An exception(%s) occurs during Send().", e.what());
+        ERR("An exception(%s) occurs during SendExactSize().", e.what());
+    }
+}
+
+void Module::SendExactSize(Module::PortMap::iterator &port_iterator, const void *data, size_t data_length)
+{
+    size_t remaining_size = data_length;
+    while (0 < remaining_size) {
+        char *data_index = (char *)data + (data_length - remaining_size);
+        size_t size_sent = remaining_size;
+        port_iterator->second->Send(data_index, size_sent);
+        if (size_sent > 0) {
+            remaining_size -= size_sent;
+        } else if (size_sent == 0) {
+            DBG("size_sent == 0");
+            remaining_size = 0;
+        }
     }
 }
 
@@ -355,11 +361,11 @@ void Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
     RET_IF(impl == nullptr);
 
     if (result == MainLoopHandler::HANGUP) {
-        ERR("Disconnected");
+        ERR("The main loop hung up. Disconnect the client.");
         return impl->HandleClientDisconnect(handle);
     }
 
-    uint32_t szmsg = 0;
+    size_t szmsg = 0;
     size_t szdata = sizeof(szmsg);
     char *msg = nullptr;
     std::string topic;
@@ -367,33 +373,28 @@ void Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
     try {
         topic = impl->GetTopicName(connect_info);
         if (topic.empty()) {
-            ERR("Unknown Topic");
+            ERR("A topic is empty.");
             return impl->HandleClientDisconnect(handle);
         }
 
-        connect_info->client->Recv(static_cast<void *>(&szmsg), szdata);
+        ReceiveExactSize(connect_info, static_cast<void *>(&szmsg), szdata);
         if (szmsg == 0) {
-            ERR("Disconnected");
+            ERR("Got a disconnection message.");
             return impl->HandleClientDisconnect(handle);
         }
 
         if (UINT32_MAX == szmsg) {
             // distinguish between connection problems and zero-size messages
-            INFO("Got zero-size Message");
+            INFO("Got a zero-size message.");
             szmsg = 0;
         }
 
         msg = static_cast<char *>(malloc(szmsg));
-        int msgSize = szmsg;
-        while (0 < msgSize) {
-            size_t receivedSize = msgSize;
-            connect_info->client->Recv(static_cast<void *>(msg + (szmsg - msgSize)), receivedSize);
-            if (receivedSize > 0) {
-                msgSize -= receivedSize;
-            }
-        }
+        ReceiveExactSize(connect_info, static_cast<void *>(msg), szmsg);
     } catch (std::exception &e) {
-        ERR("An exception(%s) occurs during Recv()", e.what());
+        ERR("An exception(%s) occurs", e.what());
+        free(msg);
+        return;
     }
 
     std::string correlation;
@@ -404,6 +405,22 @@ void Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
     free(msg);
 }
 
+void Module::ReceiveExactSize(Module::TCPData *connect_info, void *data, size_t data_length)
+{
+    size_t remaining_size = data_length;
+    while (0 < remaining_size) {
+        char *data_index = (char *)data + (data_length - remaining_size);
+        size_t size_received = remaining_size;
+        connect_info->client->Recv(data_index, size_received);
+        if (size_received > 0) {
+            remaining_size -= size_received;
+        } else if (size_received == 0) {
+            DBG("size_received == 0");
+            remaining_size = 0;
+        }
+    }
+}
+
 void Module::HandleClientDisconnect(int handle)
 {
     TCPData *connect_info = dynamic_cast<TCPData *>(main_loop.RemoveWatch(handle));
@@ -422,22 +439,20 @@ void Module::HandleClientDisconnect(int handle)
 
 std::string Module::GetTopicName(Module::TCPData *connect_info)
 {
-    uint32_t topic_len = 0;
-    size_t data_size = sizeof(topic_len);
-    connect_info->client->Recv(static_cast<void *>(&topic_len), data_size);
+    size_t topic_length = 0;
+    ReceiveExactSize(connect_info, static_cast<void *>(&topic_length), sizeof(topic_length));
 
-    if (AITT_TOPIC_NAME_MAX < topic_len) {
-        ERR("Invalid topic name length(%d)", topic_len);
+    if (AITT_TOPIC_NAME_MAX < topic_length) {
+        ERR("Invalid topic name length(%zu)", topic_length);
         return std::string();
     }
 
-    char data[topic_len];
-    data_size = topic_len;
-    connect_info->client->Recv(data, data_size);
-    if (data_size != topic_len)
-        ERR("Recv() Fail");
+    char topic_buffer[topic_length];
+    ReceiveExactSize(connect_info, topic_buffer, topic_length);
+    std::string topic = std::string(topic_buffer, topic_length);
+    INFO("Complete topic = [%s], topic_len = %zu", topic.c_str(), topic_length);
 
-    return std::string(data, data_size);
+    return topic;
 }
 
 void Module::AcceptConnection(MainLoopHandler::MainLoopResult result, int handle,
index 5cfc476..b886009 100644 (file)
@@ -116,10 +116,12 @@ class Module : public AittTransport {
           MainLoopHandler::MainLoopData *watchData);
     void HandleClientDisconnect(int handle);
     std::string GetTopicName(TCPData *connect_info);
+    static void ReceiveExactSize(
+          Module::TCPData *connect_info, void *data, size_t data_length);
     void ThreadMain(void);
-    void SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data);
     void SendTopic(const std::string &topic, Module::PortMap::iterator &portIt);
-
+    void SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data);
+    void SendExactSize(Module::PortMap::iterator &port_iterator, const void *data, size_t data_length);
     void UpdatePublishTable(const std::string &topic, const std::string &host, unsigned short port);
 
     MainLoopHandler main_loop;