apply callback count to TCP publish table
authorYoungjae Shin <yj99.shin@samsung.com>
Mon, 8 May 2023 08:07:37 +0000 (17:07 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Wed, 14 Jun 2023 11:53:18 +0000 (20:53 +0900)
- remove unnecessary comments

modules/tcp/Module.cc
modules/tcp/Module.h
modules/tcp/TCP.cc
modules/tcp/TCP.h
modules/tcp/samples/tcp_test.cc

index f6bc488526dd03e5ddf98e9a85388e3d44518380..10e0acebfdbbe23ba8331b6bd7b4d2ecac6d7d31 100644 (file)
@@ -67,18 +67,6 @@ void Module::PublishFull(const AittMsg &msg, const void *data, const int datalen
 {
     RET_IF(datalen < 0);
 
-    // NOTE:
-    // Iterate discovered service table
-    // PublishMap
-    // map {
-    //    "/customTopic/faceRecog": map {
-    //       "$clientId": map {
-    //          11234: $handle,
-    //          ...
-    //          21234: nullptr,
-    //       },
-    //    },
-    // }
     std::lock_guard<std::mutex> auto_lock_publish(publishTableLock);
     for (PublishMap::iterator it = publishTable.begin(); it != publishTable.end(); ++it) {
         // NOTE: Find entries that have matched with the given topic
@@ -86,55 +74,43 @@ void Module::PublishFull(const AittMsg &msg, const void *data, const int datalen
             continue;
 
         for (HostMap::iterator hostIt = it->second.begin(); hostIt != it->second.end(); ++hostIt) {
-            // Iterate all ports,
-            // the current implementation only be able to have the ZERO or a SINGLE entry
-            for (PortMap::iterator portIt = hostIt->second.begin(); portIt != hostIt->second.end();
-                  ++portIt) {
-                if (!portIt->second) {
-                    std::string host;
-                    {
-                        ClientMap::iterator clientIt;
-                        std::lock_guard<std::mutex> auto_lock_client(clientTableLock);
-
-                        clientIt = clientTable.find(hostIt->first);
-                        if (clientIt != clientTable.end())
-                            host = clientIt->second;
-
-                        // NOTE:
-                        // otherwise, it is a critical error
-                        // The broken clientTable or subscribeTable
-                    }
-
-                    std::unique_ptr<TCP> client(new TCP(host, portIt->first));
-
-                    // TODO:
-                    // If the client gets disconnected,
-                    // This channel entry must be cleared
-                    // In order to do that,
-                    // There should be an observer to monitor
-                    // each connections and manipulate
-                    // the discovered service table
-                    portIt->second = std::move(client);
+            PortInfo &port_info = hostIt->second;
+            if (!port_info.second) {
+                std::string host;
+                {
+                    ClientMap::iterator clientIt;
+                    std::lock_guard<std::mutex> auto_lock_client(clientTableLock);
+
+                    clientIt = clientTable.find(hostIt->first);
+                    if (clientIt != clientTable.end())
+                        host = clientIt->second;
+
+                    // NOTE:
+                    // otherwise, it is a critical error
+                    // The broken clientTable or subscribeTable
                 }
 
-                if (!portIt->second) {
-                    ERR("Failed to create a new client instance");
-                    continue;
-                }
+                std::unique_ptr<TCP> client(new TCP(host, port_info.first));
+                port_info.second = std::move(client);
+            }
+
+            if (!port_info.second) {
+                ERR("Failed to create a new client instance");
+                continue;
+            }
 
-                try {
-                    flexbuffers::Builder fbb;
-                    PackMsgInfo(fbb, msg, is_reply);
-                    auto buffer = fbb.GetBuffer();
-                    portIt->second->SendSizedData(buffer.data(), buffer.size());
+            try {
+                flexbuffers::Builder fbb;
+                PackMsgInfo(fbb, msg, is_reply);
+                auto buffer = fbb.GetBuffer();
+                port_info.second->SendSizedData(buffer.data(), buffer.size());
 
-                    portIt->second->SendSizedData(data, datalen);
-                } catch (std::exception &e) {
-                    ERR("An exception(%s) occurs during Send().", e.what());
-                }
+                port_info.second->SendSizedData(data, datalen);
+            } catch (std::exception &e) {
+                ERR("An exception(%s) occurs during Send().", e.what());
             }
-        }  // connectionEntries
-    }      // publishTable
+        }
+    }  // publishTable
 }
 
 void Module::PackMsgInfo(flexbuffers::Builder &fbb, const AittMsg &msg, bool is_reply)
@@ -269,21 +245,21 @@ void Module::SendReply(AittMsg *msg, const void *data, const int datalen, AittQo
     PublishFull(*msg, data, datalen, qos, retain, true);
 }
 
+// Discovery Message (flexbuffers)
+// map {
+//   "host": "192.168.1.11",
+//   "$topic": {port, cb_list_size, key, iv}
+// }
 void Module::DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
       const void *msg, const int szmsg)
 {
-    // NOTE: Iterate discovered service table
-    // PublishMap
-    // map { topic : map { clientId : map { port : pair { "protocol": 1, "handle": nullptr } } } }
     if (!status.compare(AittDiscovery::WILL_LEAVE_NETWORK)) {
         {
             std::lock_guard<std::mutex> autoLock(clientTableLock);
-            // Delete from the { clientId : Host } mapping table
             clientTable.erase(clientId);
         }
 
         {
-            // NOTE: Iterate all topics in the publishTable holds discovered client information
             std::lock_guard<std::mutex> autoLock(publishTableLock);
             for (auto it = publishTable.begin(); it != publishTable.end(); ++it)
                 it->second.erase(clientId);
@@ -291,15 +267,9 @@ void Module::DiscoveryMessageCallback(const std::string &clientId, const std::st
         return;
     }
 
-    // serviceMessage (flexbuffers)
-    // map {
-    //   "host": "192.168.1.11",
-    //   "$topic": {port, key, iv}
-    // }
     auto map = flexbuffers::GetRoot(static_cast<const uint8_t *>(msg), szmsg).AsMap();
     std::string host = map["host"].AsString().c_str();
 
-    // NOTE: Update the clientTable
     {
         std::lock_guard<std::mutex> autoLock(clientTableLock);
         auto clientIt = clientTable.find(clientId);
@@ -320,19 +290,20 @@ void Module::DiscoveryMessageCallback(const std::string &clientId, const std::st
         auto connectInfo = map[topic].AsVector();
         size_t vec_size = connectInfo.size();
         info.port = connectInfo[0].AsUInt16();
+        info.num_of_cb = connectInfo[1].AsUInt16();
         if (secure) {
-            if (vec_size != 3) {
+            if (vec_size != 4) {
                 ERR("Unknown Message");
                 return;
             }
             info.secure = true;
-            auto key_blob = connectInfo[1].AsBlob();
+            auto key_blob = connectInfo[2].AsBlob();
             if (key_blob.size() == sizeof(info.key))
                 memcpy(info.key, key_blob.data(), key_blob.size());
             else
                 ERR("Invalid key blob(%zu) != %zu", key_blob.size(), sizeof(info.key));
 
-            auto iv_blob = connectInfo[2].AsBlob();
+            auto iv_blob = connectInfo[3].AsBlob();
             if (iv_blob.size() == sizeof(info.iv))
                 memcpy(info.iv, iv_blob.data(), iv_blob.size());
             else
@@ -345,10 +316,6 @@ void Module::DiscoveryMessageCallback(const std::string &clientId, const std::st
     }
 }
 
-// map {
-//   "host": "192.168.1.11",
-//   "$topic": {port, key, iv}
-// }
 void Module::UpdateDiscoveryMsg()
 {
     flexbuffers::Builder fbb;
@@ -359,6 +326,7 @@ void Module::UpdateDiscoveryMsg()
             if (it->second) {
                 fbb.Vector(it->first->topic.c_str(), [&]() {
                     fbb.UInt(it->second->GetPort());
+                    fbb.UInt(it->first->cb_list.size());
                     if (secure) {
                         fbb.Blob(it->second->GetCryptoKey(), AITT_TCP_ENCRYPTOR_KEY_LEN);
                         fbb.Blob(it->second->GetCryptoIv(), AITT_TCP_ENCRYPTOR_IV_LEN);
@@ -517,37 +485,23 @@ void Module::UpdatePublishTable(const std::string &topic, const std::string &cli
 {
     auto topicIt = publishTable.find(topic);
     if (topicIt == publishTable.end()) {
-        PortMap portMap;
-        portMap.insert(PortMap::value_type(info, nullptr));
         HostMap hostMap;
-        hostMap.insert(HostMap::value_type(clientId, std::move(portMap)));
+        hostMap.insert(HostMap::value_type(clientId, std::make_pair(info, nullptr)));
         publishTable.insert(PublishMap::value_type(topic, std::move(hostMap)));
         return;
     }
 
     auto hostIt = topicIt->second.find(clientId);
     if (hostIt == topicIt->second.end()) {
-        PortMap portMap;
-        portMap.insert(PortMap::value_type(info, nullptr));
-        topicIt->second.insert(HostMap::value_type(clientId, std::move(portMap)));
-        return;
-    }
-
-    if (!hostIt->second.empty()) {
-        ERR("there is the previous connection(The current implementation only has a single port "
-            "entry)");
-        auto portIt = hostIt->second.begin();
-
-        if (portIt->first.port == info.port) {
-            DBG("nothing changed. keep the current handle");
-            return;
+        topicIt->second.insert(HostMap::value_type(clientId, std::make_pair(info, nullptr)));
+    } else {
+        PortInfo &port_info = hostIt->second;
+        if (port_info.first.port == info.port) {
+            port_info.first.num_of_cb = info.num_of_cb;
+        } else {
+            port_info = std::make_pair(info, nullptr);
         }
-
-        DBG("delete the connection handle to make a new connection with the new port");
-        hostIt->second.clear();
     }
-
-    hostIt->second.insert(PortMap::value_type(info, nullptr));
 }
 
 }  // namespace AittTCPNamespace
index fccdf25c7a88066fd100717a477760af60bcff14..9cb07814955afffff42b920d9f64800c572654ee 100644 (file)
@@ -81,29 +81,16 @@ class Module : public AittTransport {
     // }
     using ClientMap = std::map<std::string /* id */, std::string /* host */>;
 
-    // NOTE:
-    // There could be multiple clientIds for the single host
-    // If several applications are run on the same device, each applicaion will get unique client
-    // Ids therefore we have to keep in mind that the clientId is not 1:1 matched for the IPAddress.
-
     // PublishTable
     // map {
     //    "/customTopic/faceRecog": map {
-    //       $clientId: map {
-    //          11234: $clientHandle,
-    //          ...
-    //          21234: $clientHandle,
+    //       $clientId: pair { 11234: $clientHandle } //one topic has one port for each client.
+    //       ...
     //       },
     //    },
     // }
-    //
-    // NOTE:
-    // TCP handle should be the unique_ptr, so if we delete the entry from the map,
-    // the handle must be released automatically
-    // in order to make the handle "unique_ptr", it should be a class object not the "void *"
-    using PortMap =
-          std::map<TCP::ConnectInfo /* port */, std::unique_ptr<TCP>, TCP::ConnectInfo::Compare>;
-    using HostMap = std::map<std::string /* clientId */, PortMap>;
+    using PortInfo = std::pair<TCP::ConnectInfo /* port */, std::unique_ptr<TCP>>;
+    using HostMap = std::map<std::string /* clientId */, PortInfo>;
     using PublishMap = std::map<std::string /* topic */, HostMap>;
 
     static int AcceptConnection(MainLoopHandler::MainLoopResult result, int handle,
index 5c24d606a37214fc92c0992a7704a97c7d745a48..0c3e038a0daaed37d9949751bf4d19e5d51768c5 100644 (file)
@@ -75,10 +75,10 @@ TCP::TCP(const std::string &host, const ConnectInfo &connect_info)
     } while (0);
 
     if (ret <= 0)
-        ret = errno;
+        ERR_CODE(errno, "TCP::TCP(%d) Fail", ret);
 
     free(addr);
-    if (handle >= 0 && close(handle) < 0)
+    if (0 <= handle && close(handle) < 0)
         ERR_CODE(errno, "close");
     throw std::runtime_error("TCP::TCP() Fail");
 }
@@ -315,7 +315,7 @@ int32_t TCP::RecvSizedDataSecure(void **data)
     return result;
 }
 
-TCP::ConnectInfo::ConnectInfo() : port(0), secure(false), key(), iv()
+TCP::ConnectInfo::ConnectInfo() : port(0), num_of_cb(0), secure(false), key(), iv()
 {
 }
 
index c29d8deb667f03feedb50465beeafa670f46151a..b9f6dd1719b14a0f7ccbcec24576c4ae86582a1b 100644 (file)
@@ -28,15 +28,10 @@ class TCP {
   public:
     class Server;
     struct ConnectInfo {
-        struct Compare {
-            bool operator()(const ConnectInfo &lhs, const ConnectInfo &rhs) const
-            {
-                return lhs.port < rhs.port;
-            }
-        };
-
         ConnectInfo();
+
         unsigned short port;
+        int num_of_cb;
         bool secure;
         unsigned char key[AITT_TCP_ENCRYPTOR_KEY_LEN];
         unsigned char iv[AITT_TCP_ENCRYPTOR_IV_LEN];
index 4ffe1e1acb3aea64282805b71203e3efe559eb95..0b8bf6b3a2111cdb30c2ee80ec7df963a00da12c 100644 (file)
@@ -25,7 +25,7 @@
 
 #include "../TCPServer.h"
 
-//#define _LOG_WITH_TIMESTAMP
+// #define _LOG_WITH_TIMESTAMP
 #include "aitt_internal.h"
 #ifdef _LOG_WITH_TIMESTAMP
 __thread __aitt__tls__ __aitt;