[Problem] There's no way to send packets securely through TCP.
[Solution] Encrypt/Decrypt TCP packets with AES algorithm.
class AittTransport {
public:
- typedef void *(*ModuleEntry)(const char *ip, AittDiscovery &discovery);
+ typedef void *(*ModuleEntry)(AittProtocol protocol, const char *ip, AittDiscovery &discovery);
using SubscribeCallback = std::function<void(const std::string &topic, const void *msg,
const size_t szmsg, void *cbdata, const std::string &correlation)>;
virtual ~AittTransport(void) = default;
virtual void Publish(const std::string &topic, const void *data, const size_t datalen,
- const std::string &correlation, AittQoS qos = AITT_QOS_AT_MOST_ONCE,
- bool retain = false) = 0;
+ AittQoS qos = AITT_QOS_AT_MOST_ONCE, bool retain = false) = 0;
virtual void Publish(const std::string &topic, const void *data, const size_t datalen,
- AittQoS qos = AITT_QOS_AT_MOST_ONCE, bool retain = false) = 0;
+ const std::string &correlation, AittQoS qos = AITT_QOS_AT_MOST_ONCE,
+ bool retain = false) = 0;
virtual void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
void *cbdata = nullptr, AittQoS qos = AITT_QOS_AT_MOST_ONCE) = 0;
+
virtual void *Subscribe(const std::string &topic, const SubscribeCallback &cb, const void *data,
const size_t datalen, void *cbdata = nullptr, AittQoS qos = AITT_QOS_AT_MOST_ONCE) = 0;
virtual void *Unsubscribe(void *handle) = 0;
protected:
- aitt::AittDiscovery &discovery;
+ AittDiscovery &discovery;
};
} // namespace aitt
std::string ModuleLoader::GetModuleFilename(Type type)
{
- if (type == TYPE_TCP)
+ if (type == TYPE_TCP || type == TYPE_SECURE_TCP)
return "libaitt-transport-tcp.so";
if (type == TYPE_WEBRTC)
return "libaitt-transport-webrtc.so";
return handle;
}
-std::unique_ptr<AittTransport> ModuleLoader::LoadTransport(void *handle, const std::string &ip,
- AittDiscovery &discovery)
+std::unique_ptr<AittTransport> ModuleLoader::LoadTransport(
+ void *handle, AittProtocol protocol, const std::string &ip, AittDiscovery &discovery)
{
if (handle == nullptr) {
ERR("handle is NULL");
- return std::unique_ptr<AittTransport>(new NullTransport(ip.c_str(), discovery));
+ return std::unique_ptr<AittTransport>(
+ new NullTransport(ip.c_str(), discovery));
}
AittTransport::ModuleEntry get_instance_fn = reinterpret_cast<AittTransport::ModuleEntry>(
dlsym(handle, AittTransport::MODULE_ENTRY_NAME));
if (get_instance_fn == nullptr) {
ERR("dlsym: %s", dlerror());
- return std::unique_ptr<AittTransport>(new NullTransport(ip.c_str(), discovery));
+ return std::unique_ptr<AittTransport>(
+ new NullTransport(ip.c_str(), discovery));
}
std::unique_ptr<AittTransport> instance(
- static_cast<AittTransport *>(get_instance_fn(ip.c_str(), discovery)));
+ static_cast<AittTransport *>(get_instance_fn(protocol, ip.c_str(), discovery)));
if (instance == nullptr) {
ERR("get_instance_fn(AittTransport) Fail");
- return std::unique_ptr<AittTransport>(new NullTransport(ip.c_str(), discovery));
+ return std::unique_ptr<AittTransport>(
+ new NullTransport(ip.c_str(), discovery));
}
return instance;
return instance;
}
+AittProtocol ModuleLoader::GetProtocol(Type type)
+{
+ switch (type) {
+ case TYPE_TCP:
+ return AITT_TYPE_TCP;
+ case TYPE_SECURE_TCP:
+ return AITT_TYPE_SECURE_TCP;
+ case TYPE_WEBRTC:
+ return AITT_TYPE_WEBRTC;
+ case TYPE_RTSP:
+ default:
+ return AITT_TYPE_UNKNOWN;
+ }
+}
+
} // namespace aitt
#include <string>
#include "AittTransport.h"
+#include "AittTypes.h"
#include "MQ.h"
namespace aitt {
public:
enum Type {
TYPE_TCP,
+ TYPE_SECURE_TCP,
TYPE_WEBRTC,
TYPE_RTSP,
TYPE_TRANSPORT_MAX,
virtual ~ModuleLoader() = default;
ModuleHandle OpenModule(Type type);
- std::unique_ptr<AittTransport> LoadTransport(void *handle, const std::string &ip,
- AittDiscovery &discovery);
+ std::unique_ptr<AittTransport> LoadTransport(
+ void *handle, AittProtocol protocol, const std::string &ip, AittDiscovery &discovery);
std::unique_ptr<MQ> LoadMqttClient(void *handle, const std::string &id,
const AittOption &option);
+ AittProtocol GetProtocol(Type type);
private:
std::string GetModuleFilename(Type type);
{
}
-void NullTransport::Publish(const std::string& topic, const void* data, const size_t datalen,
- const std::string& correlation, AittQoS qos, bool retain)
+void NullTransport::Publish(
+ const std::string& topic, const void* data, const size_t datalen, AittQoS qos, bool retain)
{
}
void NullTransport::Publish(const std::string& topic, const void* data, const size_t datalen,
- AittQoS qos, bool retain)
+ const std::string& correlation, AittQoS qos, bool retain)
{
}
-void* NullTransport::Subscribe(const std::string& topic, const SubscribeCallback& cb, void* cbdata,
- AittQoS qos)
+void* NullTransport::Subscribe(
+ const std::string& topic, const SubscribeCallback& cb, void* cbdata, AittQoS qos)
{
return nullptr;
}
virtual ~NullTransport(void) = default;
void Publish(const std::string &topic, const void *data, const size_t datalen,
- const std::string &correlation, AittQoS qos = AITT_QOS_AT_MOST_ONCE,
- bool retain = false) override;
+ AittQoS qos = AITT_QOS_AT_MOST_ONCE, bool retain = false) override;
void Publish(const std::string &topic, const void *data, const size_t datalen,
- AittQoS qos = AITT_QOS_AT_MOST_ONCE, bool retain = false) override;
+ const std::string &correlation, AittQoS qos = AITT_QOS_AT_MOST_ONCE,
+ bool retain = false) override;
void *Subscribe(const std::string &topic, const SubscribeCallback &cb, void *cbdata = nullptr,
AittQoS qos = AITT_QOS_AT_MOST_ONCE) override;
void Publish(const std::string &topic, const void *data, const size_t datalen,
AittProtocol protocols = AITT_TYPE_MQTT, AittQoS qos = AITT_QOS_AT_MOST_ONCE,
bool retain = false);
+
int PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
AittProtocol protocol, AittQoS qos, bool retain, const SubscribeCallback &cb,
void *cbdata, const std::string &correlation);
AittSubscribeID Subscribe(const std::string &topic, const SubscribeCallback &cb,
void *cbdata = nullptr, AittProtocol protocol = AITT_TYPE_MQTT,
AittQoS qos = AITT_QOS_AT_MOST_ONCE);
+
void *Unsubscribe(AittSubscribeID handle);
void SendReply(MSG *msg, const void *data, const size_t datalen, bool end = true);
enum AittProtocol {
AITT_TYPE_UNKNOWN = 0,
- AITT_TYPE_MQTT = (0x1 << 0), // Publish message through the MQTT
- AITT_TYPE_TCP = (0x1 << 1), // Publish message to peers using the TCP
- AITT_TYPE_WEBRTC = (0x1 << 2), // Publish message to peers using the WEBRTC
+ AITT_TYPE_MQTT = (0x1 << 0), // Publish message through the MQTT
+ AITT_TYPE_TCP = (0x1 << 1), // Publish message to peers using the TCP
+ AITT_TYPE_SECURE_TCP = (0x1 << 2), // Publish message to peers using the TCP with AES
+ AITT_TYPE_WEBRTC = (0x1 << 3), // Publish message to peers using the WEBRTC
};
// AittQoS only works with the AITT_TYPE_MQTT
--- /dev/null
+/*\r
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+#include "AES.h"\r
+#ifndef ANDROID\r
+#include <openssl/aes.h>\r
+#endif\r
+#include <stdexcept>\r
+\r
+#include "aitt_internal.h"\r
+\r
+#define AES_KEY_BIT_SIZE 128\r
+\r
+static const unsigned char cipher_key[] = {0x2b, 0x7e, 0x15, 0x16, 0x28, 0xae, 0xd2, 0xa6, 0xab, 0xf7, 0x15, 0x88, 0x09, 0xcf, 0x4f, 0x3c};\r
+\r
+void AES::Encrypt(const unsigned char *target_data, unsigned char *encrypted_data)\r
+{\r
+#ifndef ANDROID\r
+ static int first_run = 1;\r
+ static AES_KEY encryption_key;\r
+\r
+ if (first_run == 1) {\r
+ if (AES_set_encrypt_key(cipher_key, AES_KEY_BIT_SIZE, &encryption_key) < 0) {\r
+ ERR("Fail to AES_set_encrypt_key()");\r
+ throw std::runtime_error(strerror(errno));\r
+ }\r
+ first_run = 0;\r
+ }\r
+\r
+ AES_ecb_encrypt(target_data, encrypted_data, &encryption_key, AES_ENCRYPT);\r
+#endif\r
+}\r
+\r
+void AES::Decrypt(const unsigned char *target_data, unsigned char *decrypted_data)\r
+{\r
+#ifndef ANDROID\r
+ static int first_run = 1;\r
+ static AES_KEY decryption_key;\r
+\r
+ if (first_run == 1) {\r
+ if (AES_set_decrypt_key(cipher_key, AES_KEY_BIT_SIZE, &decryption_key) < 0) {\r
+ ERR("Fail to AES_set_decrypt_key()");\r
+ throw std::runtime_error(strerror(errno));\r
+ }\r
+ first_run = 0;\r
+ }\r
+\r
+ AES_ecb_encrypt(target_data, decrypted_data, &decryption_key, AES_DECRYPT);\r
+#endif\r
+}\r
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+class AES {
+ public:
+ AES(void);
+ ~AES(void);
+
+ static void Encrypt(const unsigned char *target_data, unsigned char *encrypted_data);
+ static void Decrypt(const unsigned char *target_data, unsigned char *decrypted_data);
+};
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
+IF(PLATFORM STREQUAL "tizen")
+ PKG_CHECK_MODULES(AITT_TCP_NEEDS REQUIRED openssl1.1)
+ELSE(PLATFORM STREQUAL "tizen")
+ PKG_CHECK_MODULES(AITT_TCP_NEEDS REQUIRED openssl)
+ENDIF(PLATFORM STREQUAL "tizen")
+
+INCLUDE_DIRECTORIES(${AITT_TCP_NEEDS_INCLUDE_DIRS})
+LINK_DIRECTORIES(${AITT_TCP_NEEDS_LIBRARY_DIRS})
+
ADD_LIBRARY(TCP_OBJ OBJECT TCP.cc TCPServer.cc)
-ADD_LIBRARY(${AITT_TCP} SHARED ../transport_entry.cc Module.cc $<TARGET_OBJECTS:TCP_OBJ>)
-TARGET_LINK_LIBRARIES(${AITT_TCP} ${AITT_TCP_NEEDS_LIBRARIES} Threads::Threads ${AITT_COMMON})
+ADD_LIBRARY(${AITT_TCP} SHARED $<TARGET_OBJECTS:TCP_OBJ> ../transport_entry.cc Module.cc AES.cc)
+
+TARGET_LINK_LIBRARIES(${AITT_TCP} Threads::Threads ${AITT_COMMON} ${AITT_TCP_NEEDS_LIBRARIES})
INSTALL(TARGETS ${AITT_TCP} DESTINATION ${CMAKE_INSTALL_LIBDIR})
* limitations under the License.
*/
#include "Module.h"
+#include "AES.h"
#include <AittUtil.h>
#include <flatbuffers/flexbuffers.h>
#include "aitt_internal.h"
-/*
- * P2P Data Packet Definition
- * TopicLength: 4 bytes
- * TopicString: $TopicLength
- */
+#define AES_KEY_BYTE_SIZE 16
-Module::Module(const std::string &ip, AittDiscovery &discovery) : AittTransport(discovery), ip(ip)
+Module::Module(AittProtocol protocol, const std::string &ip, AittDiscovery &discovery)
+ : AittTransport(discovery), ip(ip), protocol(protocol)
{
aittThread = std::thread(&Module::ThreadMain, this);
main_loop.Run();
}
+void Module::Publish(
+ const std::string &topic, const void *data, const size_t datalen, AittQoS qos, bool retain)
+{
+ Publish(topic, data, datalen, std::string(), qos, retain);
+}
+
void Module::Publish(const std::string &topic, const void *data, const size_t datalen,
const std::string &correlation, AittQoS qos, bool retain)
{
continue;
}
- SendTopic(topic, portIt);
- SendPayload(datalen, portIt, data);
+ if (protocol == AITT_TYPE_SECURE_TCP) {
+ if (SendEncryptedTopic(topic, portIt) == true)
+ SendEncryptedPayload(datalen, portIt, data);
+ } else {
+ if (SendTopic(topic, portIt) == true)
+ SendPayload(datalen, portIt, data);
+ }
}
} // connectionEntries
} // publishTable
}
-void Module::SendTopic(const std::string &topic, Module::PortMap::iterator &portIt)
+bool Module::SendEncryptedTopic(const std::string &topic, Module::PortMap::iterator &portIt)
{
size_t topic_length = topic.length();
- SendExactSize(portIt, static_cast<const void *>(&topic_length), sizeof(topic_length));
- SendExactSize(portIt, static_cast<const void *>(topic.c_str()), topic_length);
+ try {
+ SendEncryptedData(portIt, static_cast<const void *>(&topic_length), sizeof(topic_length));
+
+ SendEncryptedData(portIt, static_cast<const void *>(topic.c_str()), topic_length);
+ } catch (std::exception &e) {
+ ERR("An exception(%s) occurs during SendEncryptedData().", e.what());
+ return false;
+ }
+
+ return true;
}
-void Module::SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data)
+void Module::SendEncryptedData(
+ Module::PortMap::iterator &port_iterator, const void *data, size_t data_length)
{
- size_t payload_size = datalen;
- if (0 == datalen) {
- // distinguish between connection problems and zero-size messages
- INFO("Send a zero-size message.");
- payload_size = UINT32_MAX;
+ size_t padding_buffer_size =
+ (data_length + AES_KEY_BYTE_SIZE) / AES_KEY_BYTE_SIZE * AES_KEY_BYTE_SIZE;
+ if (padding_buffer_size % AES_KEY_BYTE_SIZE != 0) {
+ ERR("padding_buffer_size is not a multiple of AES_KEY_BYTE_SIZE.");
+ return;
}
+ DBG("data_length = %zu, padding_buffer_size = %zu", data_length, padding_buffer_size);
- try {
- SendExactSize(portIt, static_cast<void *>(&payload_size), sizeof(payload_size));
+ unsigned char padding_buffer[padding_buffer_size];
+ memcpy(padding_buffer, data, data_length);
- SendExactSize(portIt, data, datalen);
- } catch (std::exception &e) {
- ERR("An exception(%s) occurs during SendExactSize().", e.what());
+ unsigned char encrypted_data[padding_buffer_size];
+ for (int i = 0; i < static_cast<int>(padding_buffer_size) / AES_KEY_BYTE_SIZE; i++) {
+ AES::Encrypt(
+ padding_buffer + AES_KEY_BYTE_SIZE * i, encrypted_data + AES_KEY_BYTE_SIZE * i);
}
+
+ SendExactSize(port_iterator, encrypted_data, padding_buffer_size);
}
-void Module::SendExactSize(Module::PortMap::iterator &port_iterator, const void *data,
- size_t data_length)
+void Module::SendExactSize(
+ Module::PortMap::iterator &port_iterator, const void *data, size_t data_length)
{
size_t remaining_size = data_length;
while (0 < remaining_size) {
- char *data_index = (char *)data + (data_length - remaining_size);
+ const char *data_index = static_cast<const char *>(data) + (data_length - remaining_size);
size_t size_sent = remaining_size;
port_iterator->second->Send(data_index, size_sent);
if (size_sent > 0) {
}
}
-void Module::Publish(const std::string &topic, const void *data, const size_t datalen, AittQoS qos,
- bool retain)
+void Module::SendEncryptedPayload(
+ const size_t &datalen, Module::PortMap::iterator &portIt, const void *data)
{
- Publish(topic, data, datalen, std::string(), qos, retain);
+ size_t payload_size = datalen;
+ if (0 == datalen) {
+ // distinguish between connection problems and zero-size messages
+ INFO("Send a zero-size message.");
+ payload_size = UINT32_MAX;
+ }
+
+ try {
+ SendEncryptedData(portIt, static_cast<void *>(&payload_size), sizeof(payload_size));
+ if (payload_size == UINT32_MAX) {
+ INFO("An actual data size is 0. Skip this payload transmission.");
+ return;
+ }
+
+ SendEncryptedData(portIt, data, datalen);
+ } catch (std::exception &e) {
+ ERR("An exception(%s) occurs during SendEncryptedData().", e.what());
+ }
}
+bool Module::SendTopic(const std::string &topic, Module::PortMap::iterator &portIt)
+{
+ size_t topic_length = topic.length();
+
+ try {
+ SendExactSize(portIt, static_cast<const void *>(&topic_length), sizeof(topic_length));
+
+ SendExactSize(portIt, static_cast<const void *>(topic.c_str()), topic_length);
+ } catch (std::exception &e) {
+ ERR("An exception(%s) occurs during SendExactSize().", e.what());
+ return false;
+ }
+
+ return true;
+}
+
+void Module::SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data)
+{
+ size_t payload_size = datalen;
+ if (0 == datalen) {
+ // distinguish between connection problems and zero-size messages
+ INFO("Send a zero-size message.");
+ payload_size = UINT32_MAX;
+ }
+
+ try {
+ DBG("sizeof(payload_size) = %zu", sizeof(payload_size));
+ SendExactSize(portIt, static_cast<void *>(&payload_size), sizeof(payload_size));
+
+ if (payload_size == UINT32_MAX) {
+ INFO("An actual data size is 0. Skip this payload transmission.");
+ return;
+ }
+
+ DBG("datalen = %zu", datalen);
+ SendExactSize(portIt, data, datalen);
+ } catch (std::exception &e) {
+ ERR("An exception(%s) occurs during SendExactSize().", e.what());
+ }
+}
void *Module::Subscribe(const std::string &topic, const AittTransport::SubscribeCallback &cb,
void *cbdata, AittQoS qos)
{
listen_info->cb = cb;
listen_info->cbdata = cbdata;
listen_info->topic = topic;
+ listen_info->is_secure = (protocol == AITT_TYPE_SECURE_TCP ? true : false);
auto handle = tcpServer->GetHandle();
main_loop.AddWatch(handle, AcceptConnection, listen_info);
}
size_t szmsg = 0;
- size_t szdata = sizeof(szmsg);
char *msg = nullptr;
std::string topic;
try {
- topic = impl->GetTopicName(connect_info);
- if (topic.empty()) {
- ERR("A topic is empty.");
- return impl->HandleClientDisconnect(handle);
- }
+ if (connect_info->is_secure == true) {
+ topic = impl->ReceiveDecryptedTopic(connect_info);
+ if (topic.empty()) {
+ ERR("A topic is empty.");
+ return impl->HandleClientDisconnect(handle);
+ }
- ReceiveExactSize(connect_info, static_cast<void *>(&szmsg), szdata);
- if (szmsg == 0) {
- ERR("Got a disconnection message.");
- return impl->HandleClientDisconnect(handle);
- }
+ if (impl->ReceiveDecryptedPayload(connect_info, szmsg, &msg) == false) {
+ free(msg);
+ return impl->HandleClientDisconnect(handle);
+ }
+ } else {
+ topic = impl->ReceiveTopic(connect_info);
+ if (topic.empty()) {
+ ERR("A topic is empty.");
+ return impl->HandleClientDisconnect(handle);
+ }
- if (UINT32_MAX == szmsg) {
- // distinguish between connection problems and zero-size messages
- INFO("Got a zero-size message.");
- szmsg = 0;
+ if (impl->ReceivePayload(connect_info, szmsg, &msg) == false) {
+ free(msg);
+ return impl->HandleClientDisconnect(handle);
+ }
}
-
- msg = static_cast<char *>(malloc(szmsg));
- ReceiveExactSize(connect_info, static_cast<void *>(msg), szmsg);
} catch (std::exception &e) {
ERR("An exception(%s) occurs", e.what());
free(msg);
free(msg);
}
-void Module::ReceiveExactSize(Module::TCPData *connect_info, void *data, size_t data_length)
-{
- size_t remaining_size = data_length;
- while (0 < remaining_size) {
- char *data_index = (char *)data + (data_length - remaining_size);
- size_t size_received = remaining_size;
- connect_info->client->Recv(data_index, size_received);
- if (size_received > 0) {
- remaining_size -= size_received;
- } else if (size_received == 0) {
- DBG("size_received == 0");
- remaining_size = 0;
- }
- }
-}
-
void Module::HandleClientDisconnect(int handle)
{
TCPData *connect_info = dynamic_cast<TCPData *>(main_loop.RemoveWatch(handle));
delete connect_info;
}
-std::string Module::GetTopicName(Module::TCPData *connect_info)
+std::string Module::ReceiveDecryptedTopic(Module::TCPData *connect_info)
+{
+ size_t topic_length = 0;
+ ReceiveDecryptedData(connect_info, static_cast<void *>(&topic_length), sizeof(topic_length));
+
+ if (AITT_TOPIC_NAME_MAX < topic_length) {
+ ERR("Invalid topic name length(%zu)", topic_length);
+ return std::string();
+ }
+
+ char topic_buffer[topic_length];
+ ReceiveDecryptedData(connect_info, topic_buffer, topic_length);
+ std::string topic = std::string(topic_buffer, topic_length);
+ INFO("Complete topic = [%s], topic_len = %zu", topic.c_str(), topic_length);
+
+ return topic;
+}
+
+bool Module::ReceiveDecryptedPayload(Module::TCPData *connect_info, size_t &szmsg, char **msg)
+{
+ ReceiveDecryptedData(connect_info, static_cast<void *>(&szmsg), sizeof(szmsg));
+ if (szmsg == 0) {
+ ERR("Got a disconnection message.");
+ return false;
+ }
+
+ if (UINT32_MAX == szmsg) {
+ // Distinguish between connection problems and zero-size messages.
+ INFO("Got a zero-size message. Skip this payload transmission.");
+ szmsg = 0;
+ } else {
+ *msg = static_cast<char *>(malloc(szmsg));
+ ReceiveDecryptedData(connect_info, static_cast<void *>(*msg), szmsg);
+ }
+
+ return true;
+}
+
+void Module::ReceiveDecryptedData(Module::TCPData *connect_info, void *data, size_t data_length)
+{
+ size_t padding_buffer_size =
+ (data_length + AES_KEY_BYTE_SIZE) / AES_KEY_BYTE_SIZE * AES_KEY_BYTE_SIZE;
+ if (padding_buffer_size % AES_KEY_BYTE_SIZE != 0) {
+ ERR("data_length is not a multiple of AES_KEY_BYTE_SIZE.");
+ return;
+ }
+ DBG("data_length = %zu, padding_buffer_size = %zu", data_length, padding_buffer_size);
+
+ unsigned char padding_buffer[padding_buffer_size];
+ ReceiveExactSize(connect_info, static_cast<void *>(padding_buffer), padding_buffer_size);
+
+ unsigned char decrypted_data[padding_buffer_size];
+ for (int i = 0; i < (int)padding_buffer_size / AES_KEY_BYTE_SIZE; i++) {
+ AES::Decrypt(
+ padding_buffer + AES_KEY_BYTE_SIZE * i, decrypted_data + AES_KEY_BYTE_SIZE * i);
+ }
+ memcpy(data, decrypted_data, data_length);
+}
+
+void Module::ReceiveExactSize(Module::TCPData *connect_info, void *data, size_t data_length)
+{
+ if (data_length == 0) {
+ DBG("data_length is zero.");
+ return;
+ }
+
+ size_t remaining_size = data_length;
+ while (0 < remaining_size) {
+ char *data_index = (char *)data + (data_length - remaining_size);
+ size_t size_received = remaining_size;
+ connect_info->client->Recv(data_index, size_received);
+ if (size_received > 0) {
+ remaining_size -= size_received;
+ } else if (size_received == 0) {
+ DBG("size_received == 0");
+ remaining_size = 0;
+ }
+ }
+}
+
+std::string Module::ReceiveTopic(Module::TCPData *connect_info)
{
size_t topic_length = 0;
ReceiveExactSize(connect_info, static_cast<void *>(&topic_length), sizeof(topic_length));
return topic;
}
+bool Module::ReceivePayload(Module::TCPData *connect_info, size_t &szmsg, char **msg)
+{
+ ReceiveExactSize(connect_info, static_cast<void *>(&szmsg), sizeof(szmsg));
+ if (szmsg == 0) {
+ ERR("Got a disconnection message.");
+ return false;
+ }
+ ERR("szmsg = [%zu]", szmsg);
+ if (UINT32_MAX == szmsg) {
+ // Distinguish between connection problems and zero-size messages.
+ INFO("Got a zero-size message. Skip this payload transmission.");
+ szmsg = 0;
+ } else {
+ *msg = static_cast<char *>(malloc(szmsg));
+ ReceiveExactSize(connect_info, static_cast<void *>(*msg), szmsg);
+ }
+
+ return true;
+}
+
void Module::AcceptConnection(MainLoopHandler::MainLoopResult result, int handle,
MainLoopHandler::MainLoopData *user_data)
{
TCPData *ecd = new TCPData;
ecd->parent = listen_info;
ecd->client = std::move(client);
+ ecd->is_secure = listen_info->is_secure;
impl->main_loop.AddWatch(cHandle, ReceiveData, ecd);
}
}
// NOTE:
- // The current implementation only has a single port entry
- // therefore, if the hostIt is not empty, there is the previous connection
+ // The current implementation only has a single port entry.
+ // Therefore, if the hostIt is not empty, there is the previous connection.
if (!hostIt->second.empty()) {
auto portIt = hostIt->second.begin();
if (portIt->first == port)
- return; // nothing changed. keep the current handle
+ return; // Nothing is changed. Keep the current handle.
- // otherwise, delete the connection handle
- // to make a new connection with the new port
+ // Otherwise, delete the connection handle
+ // to make a new connection with the new port.
hostIt->second.clear();
}
class Module : public AittTransport {
public:
- explicit Module(const std::string &ip, AittDiscovery &discovery);
+ explicit Module(AittProtocol protocol, const std::string &ip, AittDiscovery &discovery);
virtual ~Module(void);
void Publish(const std::string &topic, const void *data, const size_t datalen,
- const std::string &correlation, AittQoS qos = AITT_QOS_AT_MOST_ONCE,
- bool retain = false) override;
+ AittQoS qos = AITT_QOS_AT_MOST_ONCE, bool retain = false) override;
+
+ void Publish_(const std::string &topic, const void *data, const size_t datalen,
+ const std::string &correlation, AittQoS qos, bool retain);
void Publish(const std::string &topic, const void *data, const size_t datalen,
- AittQoS qos = AITT_QOS_AT_MOST_ONCE, bool retain = false) override;
+ const std::string &correlation, AittQoS qos = AITT_QOS_AT_MOST_ONCE,
+ bool retain = false) override;
void *Subscribe(const std::string &topic, const SubscribeCallback &cb, void *cbdata = nullptr,
AittQoS qos = AITT_QOS_AT_MOST_ONCE) override;
+ void *Subscribe_(const std::string &topic, const AittTransport::SubscribeCallback &cb,
+ void *cbdata, AittQoS qos);
+
void *Subscribe(const std::string &topic, const SubscribeCallback &cb, const void *data,
const size_t datalen, void *cbdata = nullptr,
AittQoS qos = AITT_QOS_AT_MOST_ONCE) override;
+
void *Unsubscribe(void *handle) override;
private:
std::string topic;
std::vector<int> client_list;
std::mutex client_lock;
+ bool is_secure;
};
struct TCPData : public MainLoopHandler::MainLoopData {
TCPServerData *parent;
std::unique_ptr<TCP> client;
+ bool is_secure;
};
// SubscribeTable
void DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
const void *msg, const int szmsg);
void UpdateDiscoveryMsg();
+ void ThreadMain(void);
+ bool SendEncryptedTopic(const std::string &topic, Module::PortMap::iterator &portIt);
+ void SendEncryptedData(
+ Module::PortMap::iterator &port_iterator, const void *data, size_t data_length);
+ void SendExactSize(
+ Module::PortMap::iterator &port_iterator, const void *data, size_t data_length);
+ void SendEncryptedPayload(
+ const size_t &datalen, Module::PortMap::iterator &portIt, const void *data);
+ bool SendTopic(const std::string &topic, Module::PortMap::iterator &portIt);
+ void SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data);
static void ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
MainLoopHandler::MainLoopData *watchData);
void HandleClientDisconnect(int handle);
- std::string GetTopicName(TCPData *connect_info);
+ std::string ReceiveDecryptedTopic(TCPData *connect_info);
+ bool ReceiveDecryptedPayload(Module::TCPData *connect_info, size_t &szmsg, char **msg);
+ static void ReceiveDecryptedData(Module::TCPData *connect_info, void *data, size_t data_length);
static void ReceiveExactSize(
Module::TCPData *connect_info, void *data, size_t data_length);
- void ThreadMain(void);
- void SendTopic(const std::string &topic, Module::PortMap::iterator &portIt);
- void SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data);
- void SendExactSize(Module::PortMap::iterator &port_iterator, const void *data, size_t data_length);
+ std::string ReceiveTopic(TCPData *connect_info);
+ bool ReceivePayload(Module::TCPData *connect_info, size_t &szmsg, char **msg);
void UpdatePublishTable(const std::string &topic, const std::string &host, unsigned short port);
MainLoopHandler main_loop;
std::mutex subscribeTableLock;
ClientMap clientTable;
std::mutex clientTableLock;
+
+ AittProtocol protocol;
};
ADD_EXECUTABLE("aitt_tcp_test" tcp_test.cc $<TARGET_OBJECTS:TCP_OBJ>)
-TARGET_LINK_LIBRARIES("aitt_tcp_test" ${PROJECT_NAME} Threads::Threads ${AITT_NEEDS_LIBRARIES})
+TARGET_LINK_LIBRARIES("aitt_tcp_test" ${PROJECT_NAME} Threads::Threads ${AITT_NEEDS_LIBRARIES} ${AITT_TCP_NEEDS_LIBRARIES})
INSTALL(TARGETS "aitt_tcp_test" DESTINATION ${AITT_TEST_BINDIR})
SET(AITT_TCP_UT_SRC TCP_test.cc TCPServer_test.cc)
ADD_EXECUTABLE(${AITT_TCP_UT} ${AITT_TCP_UT_SRC} $<TARGET_OBJECTS:TCP_OBJ>)
-TARGET_LINK_LIBRARIES(${AITT_TCP_UT} ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_NEEDS_LIBRARIES})
+TARGET_LINK_LIBRARIES(${AITT_TCP_UT} ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_NEEDS_LIBRARIES} ${AITT_TCP_NEEDS_LIBRARIES})
INSTALL(TARGETS ${AITT_TCP_UT} DESTINATION ${AITT_TEST_BINDIR})
ADD_TEST(
extern "C" {
-API void *AITT_TRANSPORT_NEW(const char *ip, AittDiscovery &discovery)
+API void *AITT_TRANSPORT_NEW(AittProtocol protocol, const char *ip, AittDiscovery &discovery)
{
assert(STR_EQ == strcmp(__func__, aitt::AittTransport::MODULE_ENTRY_NAME)
&& "Entry point name is not matched");
std::string ip_address(ip);
- Module *module = new Module(ip_address, discovery);
+ Module *module = new Module(protocol, ip_address, discovery);
// validate that the module creates valid object (which inherits AittTransport)
AittTransport *transport_module = dynamic_cast<AittTransport *>(module);
#include "Config.h"
#include "aitt_internal.h"
-Module::Module(const std::string &ip, AittDiscovery &discovery) : AittTransport(discovery)
+Module::Module(AittProtocol protocol, const std::string &ip, AittDiscovery &discovery)
+ : AittTransport(discovery)
{
}
// TODO
}
-void Module::Publish(const std::string &topic, const void *data, const size_t datalen, AittQoS qos,
- bool retain)
+void Module::Publish(
+ const std::string &topic, const void *data, const size_t datalen, AittQoS qos, bool retain)
{
std::lock_guard<std::mutex> publish_table_lock(publish_table_lock_);
class Module : public AittTransport {
public:
- explicit Module(const std::string &ip, AittDiscovery &discovery);
+ explicit Module(AittProtocol protocol, const std::string &ip, AittDiscovery &discovery);
virtual ~Module(void);
// TODO: How about regarding topic as service name?
BuildRequires: pkgconfig(capi-media-webrtc)
BuildRequires: pkgconfig(capi-media-camera)
BuildRequires: pkgconfig(json-glib-1.0)
+BuildRequires: pkgconfig(openssl1.1)
%if 0%{gcov}
BuildRequires: lcov
%endif
if (handle == nullptr)
ERR("OpenModule() Fail");
- transports[i] = loader.LoadTransport(handle.get(), my_ip, discovery);
+ transports[i] = loader.LoadTransport(handle.get(), loader.GetProtocol(i), my_ip, discovery);
}
aittThread = std::thread(&AITT::Impl::ThreadMain, this);
}
case AITT_TYPE_TCP:
transports[ModuleLoader::TYPE_TCP]->Unsubscribe(subscribe_info->second);
break;
+ case AITT_TYPE_SECURE_TCP:
+ transports[ModuleLoader::TYPE_SECURE_TCP]->Unsubscribe(subscribe_info->second);
+ break;
case AITT_TYPE_WEBRTC:
transports[ModuleLoader::TYPE_WEBRTC]->Unsubscribe(subscribe_info->second);
break;
if ((protocols & AITT_TYPE_TCP) == AITT_TYPE_TCP)
transports[ModuleLoader::TYPE_TCP]->Publish(topic, data, datalen, qos, retain);
+ if ((protocols & AITT_TYPE_SECURE_TCP) == AITT_TYPE_SECURE_TCP)
+ transports[ModuleLoader::TYPE_SECURE_TCP]->Publish(topic, data, datalen, qos, retain);
+
if ((protocols & AITT_TYPE_WEBRTC) == AITT_TYPE_WEBRTC)
PublishWebRtc(topic, data, datalen, qos, retain);
}
{
SubscribeInfo *info = new SubscribeInfo();
info->first = protocol;
-
void *subscribe_handle;
switch (protocol) {
case AITT_TYPE_MQTT:
case AITT_TYPE_TCP:
subscribe_handle = SubscribeTCP(info, topic, cb, user_data, qos);
break;
+ case AITT_TYPE_SECURE_TCP:
+ subscribe_handle = SubscribeSecureTCP(info, topic, cb, user_data, qos);
+ break;
case AITT_TYPE_WEBRTC:
subscribe_handle = SubscribeWebRtc(info, topic, cb, user_data, qos);
break;
Subscribe(
replyTopic,
- [this, cb](MSG *sub_msg, const void *sub_data, const size_t sub_datalen,
- void *sub_cbdata) {
+ [this, cb](
+ MSG *sub_msg, const void *sub_data, const size_t sub_datalen, void *sub_cbdata) {
if (sub_msg->IsEndSequence()) {
try {
Unsubscribe(sub_msg->GetID());
void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic,
const SubscribeCallback &cb, void *user_data, AittQoS qos)
{
+ ERR("[ENTER] SubscribeTCP");
return transports[ModuleLoader::TYPE_TCP]->Subscribe(
topic,
[handle, cb](const std::string &topic, const void *data, const size_t datalen,
user_data, qos);
}
+void *AITT::Impl::SubscribeSecureTCP(SubscribeInfo *handle, const std::string &topic,
+ const SubscribeCallback &cb, void *user_data, AittQoS qos)
+{
+ ERR("[ENTER] SubscribeSecureTCP");
+ return transports[ModuleLoader::TYPE_SECURE_TCP]->Subscribe(
+ topic,
+ [handle, cb](const std::string &topic, const void *data, const size_t datalen,
+ void *user_data, const std::string &correlation) -> void {
+ MSG msg;
+ msg.SetID(handle);
+ msg.SetTopic(topic);
+ msg.SetCorrelation(correlation);
+ msg.SetProtocols(AITT_TYPE_SECURE_TCP);
+
+ return cb(&msg, data, datalen, user_data);
+ },
+ user_data, qos);
+}
+
void *AITT::Impl::SubscribeWebRtc(SubscribeInfo *handle, const std::string &topic,
const SubscribeCallback &cb, void *user_data, AittQoS qos)
{
void Publish(const std::string &topic, const void *data, const size_t datalen,
AittProtocol protocols, AittQoS qos, bool retain);
+
int PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
AittProtocol protocol, AittQoS qos, bool retain, const AITT::SubscribeCallback &cb,
void *cbdata, const std::string &correlation);
+
int PublishWithReplySync(const std::string &topic, const void *data, const size_t datalen,
AittProtocol protocol, AittQoS qos, bool retain, const SubscribeCallback &cb,
void *cbdata, const std::string &correlation, int timeout_ms);
AittSubscribeID Subscribe(const std::string &topic, const AITT::SubscribeCallback &cb,
void *cbdata, AittProtocol protocols, AittQoS qos);
+
void *Unsubscribe(AittSubscribeID handle);
void SendReply(MSG *msg, const void *data, const int datalen, bool end);
MainLoopHandler::MainLoopData *loop_data);
void *SubscribeTCP(SubscribeInfo *, const std::string &topic, const SubscribeCallback &cb,
void *cbdata, AittQoS qos);
+ void *SubscribeSecureTCP(SubscribeInfo *handle, const std::string &topic,
+ const SubscribeCallback &cb, void *user_data, AittQoS qos);
void *SubscribeWebRtc(SubscribeInfo *, const std::string &topic, const SubscribeCallback &cb,
void *cbdata, AittQoS qos);
void HandleTimeout(int timeout_ms, unsigned int &timeout_id, aitt::MainLoopHandler &sync_loop,
protected:
void SetUp() override { Init(); }
void TearDown() override { Deinit(); }
+
+ void TCPWildcardsTopicTemplate(AittProtocol protocol)
+ {
+ try {
+ char dump_msg[204800];
+
+ AITT aitt(clientId, LOCAL_IP);
+ aitt.Connect();
+
+ int cnt = 0;
+ aitt.Subscribe(
+ "test/+",
+ [&](aitt::MSG *handle, const void *msg, const size_t szmsg,
+ void *cbdata) -> void {
+ AITTTCPTest *test = static_cast<AITTTCPTest *>(cbdata);
+ INFO("Got Message(Topic:%s, size:%zu)", handle->GetTopic().c_str(), szmsg);
+ ++cnt;
+
+ std::stringstream ss;
+ ss << "test/value" << cnt;
+ EXPECT_EQ(ss.str(), handle->GetTopic());
+
+ if (cnt == 3)
+ test->ToggleReady();
+ },
+ static_cast<void *>(this), protocol);
+
+ // Wait a few seconds until the AITT client gets a server list (discover devices)
+ DBG("Sleep %d secs", SLEEP_MS);
+ sleep(SLEEP_MS);
+
+ aitt.Publish("test/value1", dump_msg, 12, protocol);
+ aitt.Publish("test/value2", dump_msg, 1600, protocol);
+ aitt.Publish("test/value3", dump_msg, 1600, protocol);
+
+ g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
+
+ IterateEventLoop();
+
+ ASSERT_TRUE(ready);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+ }
};
TEST_F(AITTTCPTest, TCP_Wildcards1_Anytime)
TEST_F(AITTTCPTest, TCP_Wildcards2_Anytime)
{
- try {
- char dump_msg[204800];
-
- AITT aitt(clientId, LOCAL_IP);
- aitt.Connect();
-
- aitt.Subscribe(
- "test/+",
- [&](aitt::MSG *handle, const void *msg, const size_t szmsg, void *cbdata) -> void {
- AITTTCPTest *test = static_cast<AITTTCPTest *>(cbdata);
- INFO("Got Message(Topic:%s, size:%zu)", handle->GetTopic().c_str(), szmsg);
- static int cnt = 0;
- ++cnt;
-
- std::stringstream ss;
- ss << "test/value" << cnt;
- EXPECT_EQ(ss.str(), handle->GetTopic());
-
- if (cnt == 3)
- test->ToggleReady();
- },
- static_cast<void *>(this), AITT_TYPE_TCP);
-
- // Wait a few seconds until the AITT client gets a server list (discover devices)
- DBG("Sleep %d secs", SLEEP_MS);
- sleep(SLEEP_MS);
-
- aitt.Publish("test/value1", dump_msg, 12, AITT_TYPE_TCP);
- aitt.Publish("test/value2", dump_msg, 1600, AITT_TYPE_TCP);
- aitt.Publish("test/value3", dump_msg, 1600, AITT_TYPE_TCP);
-
- g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
-
- IterateEventLoop();
+ TCPWildcardsTopicTemplate(AITT_TYPE_TCP);
+}
- ASSERT_TRUE(ready);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
+TEST_F(AITTTCPTest, SECURE_TCP_Wildcards_Anytime)
+{
+ TCPWildcardsTopicTemplate(AITT_TYPE_SECURE_TCP);
}
void SetUp() override { Init(); }
void TearDown() override { Deinit(); }
- void pubsub_template(const char *test_msg, AittProtocol protocol)
+ void PubsubTemplate(const char *test_msg, AittProtocol protocol)
{
try {
AITT aitt(clientId, LOCAL_IP, AittOption(true, false));
FAIL() << "Unexpected exception: " << e.what();
}
}
+
+ void PublishDisconnectTemplate(AittProtocol protocol)
+ {
+ const char character_set[] =
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
+ std::mt19937 random_gen{std::random_device{}()};
+ std::uniform_int_distribution<std::string::size_type> gen(0, 61);
+
+ char dump_msg[204800] = {};
+ for (size_t i = 0; i < sizeof(dump_msg); i++) {
+ dump_msg[i] = character_set[gen(random_gen)];
+ }
+
+ try {
+ AITT aitt(clientId, LOCAL_IP);
+ AITT aitt_retry("retry_test", LOCAL_IP);
+ aitt.Connect();
+ aitt_retry.Connect();
+
+ int cnt = 0;
+ aitt.Subscribe(
+ "test/stress1",
+ [&](aitt::MSG *handle, const void *msg, const size_t szmsg, void *cbdata) -> void {
+ AITTTest *test = static_cast<AITTTest *>(cbdata);
+ ++cnt;
+ if (szmsg == 0 && cnt != 12) {
+ FAIL() << "Unexpected value" << cnt;
+ }
+
+ DBG("A subscription message is arrived. cnt = %d", cnt);
+ const char *receivedMsg = static_cast<const char *>(msg);
+ ASSERT_TRUE(!strcmp(receivedMsg, dump_msg));
+
+ if (cnt == 10)
+ test->ToggleReady();
+ if (cnt == 11)
+ test->ToggleReady();
+ },
+ static_cast<void *>(this), protocol);
+
+ {
+ AITT aitt1("stress_test1", LOCAL_IP);
+ aitt1.Connect();
+
+ // Wait a few seconds to the AITT client gets server list (discover devices)
+ sleep(SLEEP_MS);
+
+ for (int i = 0; i < 10; i++) {
+ INFO("size = %zu", sizeof(dump_msg));
+ aitt1.Publish("test/stress1", dump_msg, sizeof(dump_msg), protocol,
+ AITT_QOS_AT_MOST_ONCE, true);
+ }
+ g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
+
+ IterateEventLoop();
+ }
+ DBG("Client aitt1 is finished.");
+
+ // Here, an unexpected callback(szmsg = 0) is received
+ // when the publisher is disconnected.
+
+ ASSERT_TRUE(ready);
+ ready = false;
+
+ aitt_retry.Publish("test/stress1", dump_msg, sizeof(dump_msg), protocol,
+ AITT_QOS_AT_MOST_ONCE, true);
+
+ g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
+
+ IterateEventLoop();
+
+ ASSERT_TRUE(ready);
+
+ aitt_retry.Publish("test/stress1", nullptr, 0, protocol, AITT_QOS_AT_LEAST_ONCE);
+ // Check auto release of aitt. There should be no segmentation faults.
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+ }
+
+ void PublishSubscribeTCPTwiceTemplate(AittProtocol protocol)
+ {
+ try {
+ AITT aitt(clientId, LOCAL_IP);
+ aitt.Connect();
+
+ int cnt = 0;
+ aitt.Subscribe(
+ testTopic,
+ [&](aitt::MSG *handle, const void *msg, const size_t szmsg,
+ void *cbdata) -> void {
+ AITTTest *test = static_cast<AITTTest *>(cbdata);
+ // NOTE:
+ // Subscribe callback will be invoked 2 times
+ ++cnt;
+ const char *receivedMsg = static_cast<const char *>(msg);
+ DBG("Subscribe callback called: %d, szmsg = %zu, msg = [%s]", cnt, szmsg,
+ receivedMsg);
+ if (cnt == 1) {
+ ASSERT_TRUE(!strcmp(receivedMsg, TEST_MSG));
+ } else if (cnt == 2) {
+ ASSERT_TRUE(!strcmp(receivedMsg, TEST_MSG2));
+ test->ToggleReady();
+ }
+ },
+ static_cast<void *>(this), protocol);
+
+ // Wait a few seconds to the AITT client gets server list (discover devices)
+ sleep(SLEEP_MS);
+
+ // NOTE:
+ // Select target peers and send the data through the specified protocol - TCP
+ aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG), protocol);
+
+ // NOTE:
+ // Publish message through the specified protocol - TCP
+ aitt.Publish(testTopic, TEST_MSG2, sizeof(TEST_MSG2), protocol);
+
+ g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
+
+ IterateEventLoop();
+
+ ASSERT_TRUE(ready);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+ }
+
+ void SubscribeRetainedTCPTemplate(AittProtocol protocol)
+ {
+ try {
+ AITT aitt(clientId, LOCAL_IP);
+ aitt.Connect();
+
+ int cnt = 0;
+ aitt.Subscribe(
+ testTopic,
+ [&](aitt::MSG *handle, const void *msg, const size_t szmsg,
+ void *cbdata) -> void {
+ AITTTest *test = static_cast<AITTTest *>(cbdata);
+ ++cnt;
+ if (cnt == 1) {
+ ASSERT_TRUE(msg == nullptr);
+ test->ToggleReady();
+ }
+ DBG("Subscribe callback called: %d", cnt);
+ },
+ static_cast<void *>(this), protocol);
+
+ // Wait a few seconds to the AITT client gets server list (discover devices)
+ sleep(SLEEP_MS);
+
+ // NOTE:
+ // Publish a message with the retained flag
+ // This message will not be delivered, subscriber subscribes TCP protocol
+ aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG), AITT_TYPE_MQTT,
+ AITT_QOS_AT_MOST_ONCE, true);
+
+ aitt.Publish(testTopic, TEST_MSG2, sizeof(TEST_MSG2), protocol);
+
+ g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
+
+ IterateEventLoop();
+
+ aitt.Publish(testTopic, nullptr, 0, AITT_TYPE_MQTT, AITT_QOS_AT_LEAST_ONCE, true);
+
+ ASSERT_TRUE(ready);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+ }
};
TEST_F(AITTTest, Positive_Create_Anytime)
}
}
+TEST_F(AITTTest, Positive_Publish_SECURE_TCP_Anytime)
+{
+ try {
+ AITT aitt(clientId, LOCAL_IP, AittOption(true, false));
+ aitt.Connect();
+ aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG), AITT_TYPE_SECURE_TCP);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+}
+
TEST_F(AITTTest, Positive_Publish_Multiple_Protocols_Anytime)
{
try {
}
}
+TEST_F(AITTTest, Positive_Unsubscribe_SECURE_TCP_Anytime)
+{
+ try {
+ AITT aitt(clientId, LOCAL_IP, AittOption(true, false));
+ aitt.Connect();
+ subscribeHandle = aitt.Subscribe(
+ testTopic,
+ [](aitt::MSG *handle, const void *msg, const size_t szmsg, void *cbdata) -> void {},
+ nullptr, AITT_TYPE_SECURE_TCP);
+ DBG("Subscribe handle: %p", reinterpret_cast<void *>(subscribeHandle));
+ aitt.Unsubscribe(subscribeHandle);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+}
+
TEST_F(AITTTest, Positve_PublishSubscribe_MQTT_Anytime)
{
- pubsub_template(TEST_MSG, AITT_TYPE_MQTT);
+ PubsubTemplate(TEST_MSG, AITT_TYPE_MQTT);
}
TEST_F(AITTTest, Positve_Publish_0_MQTT_Anytime)
{
- pubsub_template("", AITT_TYPE_MQTT);
+ PubsubTemplate("", AITT_TYPE_MQTT);
}
TEST_F(AITTTest, Positve_Unsubscribe_in_Subscribe_MQTT_Anytime)
TEST_F(AITTTest, Positve_PublishSubscribe_TCP_Anytime)
{
- pubsub_template(TEST_MSG, AITT_TYPE_TCP);
+ PubsubTemplate(TEST_MSG, AITT_TYPE_TCP);
+}
+
+TEST_F(AITTTest, Positve_PublishSubscribe_SECURE_TCP_Anytime)
+{
+ PubsubTemplate(TEST_MSG, AITT_TYPE_SECURE_TCP);
}
TEST_F(AITTTest, Positve_Publish_0_TCP_Anytime)
{
- pubsub_template("", AITT_TYPE_TCP);
+ PubsubTemplate("", AITT_TYPE_TCP);
+}
+
+TEST_F(AITTTest, Positve_Publish_0_SECURE_TCP_Anytime)
+{
+ PubsubTemplate("", AITT_TYPE_SECURE_TCP);
}
TEST_F(AITTTest, Positve_PublishSubscribe_Multiple_Protocols_Anytime)
}
}
-TEST_F(AITTTest, Positve_PublishSubscribe_twice_Anytime)
+TEST_F(AITTTest, Positve_PublishSubscribe_TCP_twice_Anytime)
{
- try {
- AITT aitt(clientId, LOCAL_IP);
- aitt.Connect();
- aitt.Subscribe(
- testTopic,
- [&](aitt::MSG *handle, const void *msg, const size_t szmsg, void *cbdata) -> void {
- AITTTest *test = static_cast<AITTTest *>(cbdata);
- // NOTE:
- // Subscribe callback will be invoked 2 times
- static int cnt = 0;
- ++cnt;
- if (cnt == 2)
- test->ToggleReady();
- DBG("Subscribe callback called: %d", cnt);
- },
- static_cast<void *>(this), AITT_TYPE_TCP);
-
- // Wait a few seconds to the AITT client gets server list (discover devices)
- sleep(SLEEP_MS);
-
- // NOTE:
- // Select target peers and send the data through the specified protocol - TCP
- aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG), AITT_TYPE_TCP);
-
- // NOTE:
- // Publish message through the specified protocol - TCP
- aitt.Publish(testTopic, TEST_MSG2, sizeof(TEST_MSG2), AITT_TYPE_TCP);
-
- g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
-
- IterateEventLoop();
-
- ASSERT_TRUE(ready);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
+ PublishSubscribeTCPTwiceTemplate(AITT_TYPE_TCP);
}
-TEST_F(AITTTest, Positive_Subscribe_Retained_Anytime)
+TEST_F(AITTTest, Positve_PublishSubscribe_SECURE_TCP_twice_Anytime)
{
- try {
- AITT aitt(clientId, LOCAL_IP);
- aitt.Connect();
- aitt.Subscribe(
- testTopic,
- [&](aitt::MSG *handle, const void *msg, const size_t szmsg, void *cbdata) -> void {
- AITTTest *test = static_cast<AITTTest *>(cbdata);
- static int cnt = 0;
- ++cnt;
- if (cnt == 1)
- test->ToggleReady();
- DBG("Subscribe callback called: %d", cnt);
- },
- static_cast<void *>(this), AITT_TYPE_TCP);
-
- // Wait a few seconds to the AITT client gets server list (discover devices)
- sleep(SLEEP_MS);
-
- // NOTE:
- // Publish a message with the retained flag
- // This message will not be delivered, subscriber subscribes TCP protocol
- aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG), AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE,
- true);
-
- aitt.Publish(testTopic, TEST_MSG2, sizeof(TEST_MSG2), AITT_TYPE_TCP);
-
- g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
-
- IterateEventLoop();
-
- aitt.Publish(testTopic, nullptr, 0, AITT_TYPE_MQTT, AITT_QOS_AT_LEAST_ONCE, true);
-
- ASSERT_TRUE(ready);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
+ PublishSubscribeTCPTwiceTemplate(AITT_TYPE_SECURE_TCP);
}
-TEST_F(AITTTest, TCP_Publish_Disconnect_Anytime)
+TEST_F(AITTTest, Positive_Subscribe_Retained_Anytime_TCP)
{
- try {
- const char character_set[] =
- "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
- std::mt19937 random_gen{std::random_device{}()};
- std::uniform_int_distribution<std::string::size_type> gen(0, 61);
-
- char dump_msg[204800] = {};
- for (size_t i = 0; i < sizeof(dump_msg); i++) {
- dump_msg[i] = character_set[gen(random_gen)];
- }
-
- AITT aitt(clientId, LOCAL_IP);
- AITT aitt_retry("retry_test", LOCAL_IP);
- aitt.Connect();
- aitt_retry.Connect();
-
- aitt.Subscribe(
- "test/stress1",
- [&](aitt::MSG *handle, const void *msg, const size_t szmsg, void *cbdata) -> void {
- AITTTest *test = static_cast<AITTTest *>(cbdata);
- static int cnt = 0;
- ++cnt;
- if (szmsg == 0 && cnt != 12) {
- FAIL() << "Unexpected value" << cnt;
- }
- if (cnt == 10)
- test->ToggleReady();
- if (cnt == 11)
- test->ToggleReady();
- },
- static_cast<void *>(this), AITT_TYPE_TCP);
-
- {
- AITT aitt1("stress_test1", LOCAL_IP);
- aitt1.Connect();
-
- // Wait a few seconds to the AITT client gets server list (discover devices)
- sleep(SLEEP_MS);
-
- for (int i = 0; i < 10; i++) {
- INFO("size = %zu", sizeof(dump_msg));
- aitt1.Publish("test/stress1", dump_msg, sizeof(dump_msg), AITT_TYPE_TCP,
- AITT_QOS_AT_MOST_ONCE, true);
- }
- g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
-
- IterateEventLoop();
- }
- DBG("aitt1 client Finish");
-
- // Here, It's automatically checked Unexpected callback(szmsg = 0)
- // when publisher is disconnected.
-
- ASSERT_TRUE(ready);
- ready = false;
-
- aitt_retry.Publish("test/stress1", dump_msg, sizeof(dump_msg), AITT_TYPE_TCP,
- AITT_QOS_AT_MOST_ONCE, true);
-
- g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
+ SubscribeRetainedTCPTemplate(AITT_TYPE_TCP);
+}
- IterateEventLoop();
+TEST_F(AITTTest, Positive_Subscribe_Retained_Anytime_SECURE_TCP)
+{
+ SubscribeRetainedTCPTemplate(AITT_TYPE_SECURE_TCP);
+}
- ASSERT_TRUE(ready);
+TEST_F(AITTTest, TCP_Publish_Disconnect_Anytime_TCP)
+{
+ PublishDisconnectTemplate(AITT_TYPE_TCP);
+}
- aitt_retry.Publish("test/stress1", nullptr, 0, AITT_TYPE_TCP, AITT_QOS_AT_LEAST_ONCE);
- // Check auto release of aitt. It sould be no Segmentation fault
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
+TEST_F(AITTTest, TCP_Publish_Disconnect_Anytime_SECURE_TCP)
+{
+ PublishDisconnectTemplate(AITT_TYPE_SECURE_TCP);
}
TEST_F(AITTTest, WillSet_N_Anytime)
ModuleLoader::ModuleHandle handle = loader.OpenModule(ModuleLoader::TYPE_TCP);
ASSERT_NE(handle, nullptr);
- std::shared_ptr<aitt::AittTransport> module =
- loader.LoadTransport(handle.get(), LOCAL_IP, discovery);
+ std::shared_ptr<aitt::AittTransport> module = loader.LoadTransport(
+ handle.get(), loader.GetProtocol(ModuleLoader::TYPE_TCP), LOCAL_IP, discovery);
ASSERT_NE(module, nullptr);
}
ModuleLoader::ModuleHandle handle = loader.OpenModule(ModuleLoader::TYPE_TRANSPORT_MAX);
ASSERT_EQ(handle.get(), nullptr);
- auto module = loader.LoadTransport(handle.get(), LOCAL_IP, discovery);
+ auto module = loader.LoadTransport(
+ handle.get(), loader.GetProtocol(ModuleLoader::TYPE_TRANSPORT_MAX), LOCAL_IP, discovery);
ASSERT_NE(module, nullptr);
}