void Module::SendTopic(const std::string &topic, Module::PortMap::iterator &portIt)
{
- uint32_t topicLen = topic.length();
- size_t szData = sizeof(topicLen);
- portIt->second->Send(static_cast<void *>(&topicLen), szData);
- szData = topicLen;
- portIt->second->Send(static_cast<const void *>(topic.c_str()), szData);
+ size_t topic_length = topic.length();
+ SendExactSize(portIt, static_cast<const void *>(&topic_length), sizeof(topic_length));
+
+ SendExactSize(portIt, static_cast<const void *>(topic.c_str()), topic_length);
}
void Module::SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data)
{
- uint32_t sendsize = datalen;
- size_t szsize = sizeof(sendsize);
+ size_t payload_size = datalen;
+ if (0 == datalen) {
+ // distinguish between connection problems and zero-size messages
+ INFO("Send a zero-size message.");
+ payload_size = UINT32_MAX;
+ }
try {
- if (0 == datalen) {
- // distinguish between connection problems and zero-size messages
- INFO("Send zero-size Message");
- sendsize = UINT32_MAX;
- }
- portIt->second->Send(static_cast<void *>(&sendsize), szsize);
-
- int msgSize = datalen;
- while (0 < msgSize) {
- size_t sentSize = msgSize;
- char *dataIdx = (char *)data + (sendsize - msgSize);
- portIt->second->Send(dataIdx, sentSize);
- if (sentSize > 0) {
- msgSize -= sentSize;
- }
- }
+ SendExactSize(portIt, static_cast<void *>(&payload_size), sizeof(payload_size));
+
+ SendExactSize(portIt, data, datalen);
} catch (std::exception &e) {
- ERR("An exception(%s) occurs during Send().", e.what());
+ ERR("An exception(%s) occurs during SendExactSize().", e.what());
+ }
+}
+
+void Module::SendExactSize(Module::PortMap::iterator &port_iterator, const void *data, size_t data_length)
+{
+ size_t remaining_size = data_length;
+ while (0 < remaining_size) {
+ char *data_index = (char *)data + (data_length - remaining_size);
+ size_t size_sent = remaining_size;
+ port_iterator->second->Send(data_index, size_sent);
+ if (size_sent > 0) {
+ remaining_size -= size_sent;
+ } else if (size_sent == 0) {
+ DBG("size_sent == 0");
+ remaining_size = 0;
+ }
}
}
RET_IF(impl == nullptr);
if (result == MainLoopHandler::HANGUP) {
- ERR("Disconnected");
+ ERR("The main loop hung up. Disconnect the client.");
return impl->HandleClientDisconnect(handle);
}
- uint32_t szmsg = 0;
+ size_t szmsg = 0;
size_t szdata = sizeof(szmsg);
char *msg = nullptr;
std::string topic;
try {
topic = impl->GetTopicName(connect_info);
if (topic.empty()) {
- ERR("Unknown Topic");
+ ERR("A topic is empty.");
return impl->HandleClientDisconnect(handle);
}
- connect_info->client->Recv(static_cast<void *>(&szmsg), szdata);
+ ReceiveExactSize(connect_info, static_cast<void *>(&szmsg), szdata);
if (szmsg == 0) {
- ERR("Disconnected");
+ ERR("Got a disconnection message.");
return impl->HandleClientDisconnect(handle);
}
if (UINT32_MAX == szmsg) {
// distinguish between connection problems and zero-size messages
- INFO("Got zero-size Message");
+ INFO("Got a zero-size message.");
szmsg = 0;
}
msg = static_cast<char *>(malloc(szmsg));
- int msgSize = szmsg;
- while (0 < msgSize) {
- size_t receivedSize = msgSize;
- connect_info->client->Recv(static_cast<void *>(msg + (szmsg - msgSize)), receivedSize);
- if (receivedSize > 0) {
- msgSize -= receivedSize;
- }
- }
+ ReceiveExactSize(connect_info, static_cast<void *>(msg), szmsg);
} catch (std::exception &e) {
- ERR("An exception(%s) occurs during Recv()", e.what());
+ ERR("An exception(%s) occurs", e.what());
+ free(msg);
+ return;
}
std::string correlation;
free(msg);
}
+void Module::ReceiveExactSize(Module::TCPData *connect_info, void *data, size_t data_length)
+{
+ size_t remaining_size = data_length;
+ while (0 < remaining_size) {
+ char *data_index = (char *)data + (data_length - remaining_size);
+ size_t size_received = remaining_size;
+ connect_info->client->Recv(data_index, size_received);
+ if (size_received > 0) {
+ remaining_size -= size_received;
+ } else if (size_received == 0) {
+ DBG("size_received == 0");
+ remaining_size = 0;
+ }
+ }
+}
+
void Module::HandleClientDisconnect(int handle)
{
TCPData *connect_info = dynamic_cast<TCPData *>(main_loop.RemoveWatch(handle));
std::string Module::GetTopicName(Module::TCPData *connect_info)
{
- uint32_t topic_len = 0;
- size_t data_size = sizeof(topic_len);
- connect_info->client->Recv(static_cast<void *>(&topic_len), data_size);
+ size_t topic_length = 0;
+ ReceiveExactSize(connect_info, static_cast<void *>(&topic_length), sizeof(topic_length));
- if (AITT_TOPIC_NAME_MAX < topic_len) {
- ERR("Invalid topic name length(%d)", topic_len);
+ if (AITT_TOPIC_NAME_MAX < topic_length) {
+ ERR("Invalid topic name length(%zu)", topic_length);
return std::string();
}
- char data[topic_len];
- data_size = topic_len;
- connect_info->client->Recv(data, data_size);
- if (data_size != topic_len)
- ERR("Recv() Fail");
+ char topic_buffer[topic_length];
+ ReceiveExactSize(connect_info, topic_buffer, topic_length);
+ std::string topic = std::string(topic_buffer, topic_length);
+ INFO("Complete topic = [%s], topic_len = %zu", topic.c_str(), topic_length);
- return std::string(data, data_size);
+ return topic;
}
void Module::AcceptConnection(MainLoopHandler::MainLoopResult result, int handle,