{
RET_IF(datalen < 0);
- // NOTE:
- // Iterate discovered service table
- // PublishMap
- // map {
- // "/customTopic/faceRecog": map {
- // "$clientId": map {
- // 11234: $handle,
- // ...
- // 21234: nullptr,
- // },
- // },
- // }
std::lock_guard<std::mutex> auto_lock_publish(publishTableLock);
for (PublishMap::iterator it = publishTable.begin(); it != publishTable.end(); ++it) {
// NOTE: Find entries that have matched with the given topic
continue;
for (HostMap::iterator hostIt = it->second.begin(); hostIt != it->second.end(); ++hostIt) {
- // Iterate all ports,
- // the current implementation only be able to have the ZERO or a SINGLE entry
- for (PortMap::iterator portIt = hostIt->second.begin(); portIt != hostIt->second.end();
- ++portIt) {
- if (!portIt->second) {
- std::string host;
- {
- ClientMap::iterator clientIt;
- std::lock_guard<std::mutex> auto_lock_client(clientTableLock);
-
- clientIt = clientTable.find(hostIt->first);
- if (clientIt != clientTable.end())
- host = clientIt->second;
-
- // NOTE:
- // otherwise, it is a critical error
- // The broken clientTable or subscribeTable
- }
-
- std::unique_ptr<TCP> client(new TCP(host, portIt->first));
-
- // TODO:
- // If the client gets disconnected,
- // This channel entry must be cleared
- // In order to do that,
- // There should be an observer to monitor
- // each connections and manipulate
- // the discovered service table
- portIt->second = std::move(client);
+ PortInfo &port_info = hostIt->second;
+ if (!port_info.second) {
+ std::string host;
+ {
+ ClientMap::iterator clientIt;
+ std::lock_guard<std::mutex> auto_lock_client(clientTableLock);
+
+ clientIt = clientTable.find(hostIt->first);
+ if (clientIt != clientTable.end())
+ host = clientIt->second;
+
+ // NOTE:
+ // otherwise, it is a critical error
+ // The broken clientTable or subscribeTable
}
- if (!portIt->second) {
- ERR("Failed to create a new client instance");
- continue;
- }
+ std::unique_ptr<TCP> client(new TCP(host, port_info.first));
+ port_info.second = std::move(client);
+ }
+
+ if (!port_info.second) {
+ ERR("Failed to create a new client instance");
+ continue;
+ }
- try {
- flexbuffers::Builder fbb;
- PackMsgInfo(fbb, msg, is_reply);
- auto buffer = fbb.GetBuffer();
- portIt->second->SendSizedData(buffer.data(), buffer.size());
+ try {
+ flexbuffers::Builder fbb;
+ PackMsgInfo(fbb, msg, is_reply);
+ auto buffer = fbb.GetBuffer();
+ port_info.second->SendSizedData(buffer.data(), buffer.size());
- portIt->second->SendSizedData(data, datalen);
- } catch (std::exception &e) {
- ERR("An exception(%s) occurs during Send().", e.what());
- }
+ port_info.second->SendSizedData(data, datalen);
+ } catch (std::exception &e) {
+ ERR("An exception(%s) occurs during Send().", e.what());
}
- } // connectionEntries
- } // publishTable
+ }
+ } // publishTable
}
void Module::PackMsgInfo(flexbuffers::Builder &fbb, const AittMsg &msg, bool is_reply)
PublishFull(*msg, data, datalen, qos, retain, true);
}
+// Discovery Message (flexbuffers)
+// map {
+// "host": "192.168.1.11",
+// "$topic": {port, cb_list_size, key, iv}
+// }
void Module::DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
const void *msg, const int szmsg)
{
- // NOTE: Iterate discovered service table
- // PublishMap
- // map { topic : map { clientId : map { port : pair { "protocol": 1, "handle": nullptr } } } }
if (!status.compare(AittDiscovery::WILL_LEAVE_NETWORK)) {
{
std::lock_guard<std::mutex> autoLock(clientTableLock);
- // Delete from the { clientId : Host } mapping table
clientTable.erase(clientId);
}
{
- // NOTE: Iterate all topics in the publishTable holds discovered client information
std::lock_guard<std::mutex> autoLock(publishTableLock);
for (auto it = publishTable.begin(); it != publishTable.end(); ++it)
it->second.erase(clientId);
return;
}
- // serviceMessage (flexbuffers)
- // map {
- // "host": "192.168.1.11",
- // "$topic": {port, key, iv}
- // }
auto map = flexbuffers::GetRoot(static_cast<const uint8_t *>(msg), szmsg).AsMap();
std::string host = map["host"].AsString().c_str();
- // NOTE: Update the clientTable
{
std::lock_guard<std::mutex> autoLock(clientTableLock);
auto clientIt = clientTable.find(clientId);
auto connectInfo = map[topic].AsVector();
size_t vec_size = connectInfo.size();
info.port = connectInfo[0].AsUInt16();
+ info.num_of_cb = connectInfo[1].AsUInt16();
if (secure) {
- if (vec_size != 3) {
+ if (vec_size != 4) {
ERR("Unknown Message");
return;
}
info.secure = true;
- auto key_blob = connectInfo[1].AsBlob();
+ auto key_blob = connectInfo[2].AsBlob();
if (key_blob.size() == sizeof(info.key))
memcpy(info.key, key_blob.data(), key_blob.size());
else
ERR("Invalid key blob(%zu) != %zu", key_blob.size(), sizeof(info.key));
- auto iv_blob = connectInfo[2].AsBlob();
+ auto iv_blob = connectInfo[3].AsBlob();
if (iv_blob.size() == sizeof(info.iv))
memcpy(info.iv, iv_blob.data(), iv_blob.size());
else
}
}
-// map {
-// "host": "192.168.1.11",
-// "$topic": {port, key, iv}
-// }
void Module::UpdateDiscoveryMsg()
{
flexbuffers::Builder fbb;
if (it->second) {
fbb.Vector(it->first->topic.c_str(), [&]() {
fbb.UInt(it->second->GetPort());
+ fbb.UInt(it->first->cb_list.size());
if (secure) {
fbb.Blob(it->second->GetCryptoKey(), AITT_TCP_ENCRYPTOR_KEY_LEN);
fbb.Blob(it->second->GetCryptoIv(), AITT_TCP_ENCRYPTOR_IV_LEN);
{
auto topicIt = publishTable.find(topic);
if (topicIt == publishTable.end()) {
- PortMap portMap;
- portMap.insert(PortMap::value_type(info, nullptr));
HostMap hostMap;
- hostMap.insert(HostMap::value_type(clientId, std::move(portMap)));
+ hostMap.insert(HostMap::value_type(clientId, std::make_pair(info, nullptr)));
publishTable.insert(PublishMap::value_type(topic, std::move(hostMap)));
return;
}
auto hostIt = topicIt->second.find(clientId);
if (hostIt == topicIt->second.end()) {
- PortMap portMap;
- portMap.insert(PortMap::value_type(info, nullptr));
- topicIt->second.insert(HostMap::value_type(clientId, std::move(portMap)));
- return;
- }
-
- if (!hostIt->second.empty()) {
- ERR("there is the previous connection(The current implementation only has a single port "
- "entry)");
- auto portIt = hostIt->second.begin();
-
- if (portIt->first.port == info.port) {
- DBG("nothing changed. keep the current handle");
- return;
+ topicIt->second.insert(HostMap::value_type(clientId, std::make_pair(info, nullptr)));
+ } else {
+ PortInfo &port_info = hostIt->second;
+ if (port_info.first.port == info.port) {
+ port_info.first.num_of_cb = info.num_of_cb;
+ } else {
+ port_info = std::make_pair(info, nullptr);
}
-
- DBG("delete the connection handle to make a new connection with the new port");
- hostIt->second.clear();
}
-
- hostIt->second.insert(PortMap::value_type(info, nullptr));
}
} // namespace AittTCPNamespace