#include <dlfcn.h>
#include <glib.h>
#include <glib-unix.h>
-#include <mosquitto.h>
#include <condition_variable>
#include <thread>
namespace {
-#define DIRECT_ACCESS_FUNC "_request_handler_direct_access"
+constexpr char kMosquittoLibPath[] = LIB_PATH"/libmosquitto.so.1";
} // namespace
namespace GenMQTT {
+MqttService::MqttLibraryAdapter::MqttLibraryAdapter()
+ : new_(nullptr), destroy_(nullptr), connect_callback_set_(nullptr),
+ disconnect_callback_set_(nullptr), message_callback_set_(nullptr),
+ username_pw_set_(nullptr), loop_start_(nullptr), connect_(nullptr),
+ disconnect_(nullptr), subscribe_(nullptr), unsubscribe_(nullptr),
+ publish_(nullptr) {
+ handle_ = dlopen(kMosquittoLibPath, RTLD_GLOBAL | RTLD_LAZY | RTLD_NODELETE);
+ if (handle_ == nullptr) {
+ LOG(ERROR) << "Failed to open library: " << kMosquittoLibPath << ", : "
+ << dlerror();
+ THROW(error::MQTT_PLUGIN_ERROR_OPERATION_FAILED);
+ }
+
+ if (!LoadSymbols()) {
+ dlclose(handle_);
+ THROW(error::MQTT_PLUGIN_ERROR_OPERATION_FAILED);
+ }
+}
+
+MqttService::MqttLibraryAdapter::~MqttLibraryAdapter() {
+ dlclose(handle_);
+}
+
+bool MqttService::MqttLibraryAdapter::LoadSymbols() {
+ new_ = reinterpret_cast<new_fn>(dlsym(handle_, "mosquitto_new"));
+ if (!new_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ destroy_ = reinterpret_cast<destroy_fn>(dlsym(handle_, "mosquitto_destroy"));
+ if (!destroy_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ connect_callback_set_ = reinterpret_cast<connect_callback_set_fn>(
+ dlsym(handle_, "mosquitt_connect_callback_set"));
+ if (!connect_callback_set_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ disconnect_callback_set_ = reinterpret_cast<disconnect_callback_set_fn>(
+ dlsym(handle_, "mosquitt_disconnnect_callback_set"));
+ if (!disconnect_callback_set_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ message_callback_set_ = reinterpret_cast<message_callback_set_fn>(
+ dlsym(handle_, "mosquitt_message_callback_set"));
+ if (!message_callback_set_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ username_pw_set_ = reinterpret_cast<username_pw_set_fn>(
+ dlsym(handle_, "mosquitto_username_pw_set"));
+ if (!username_pw_set_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ loop_start_ = reinterpret_cast<loop_start_fn>(
+ dlsym(handle_, "mosquitto_loop_start"));
+ if (!loop_start_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ connect_ = reinterpret_cast<connect_fn>(dlsym(handle_, "mosquitto_connect"));
+ if (!connect_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ disconnect_ = reinterpret_cast<disconnect_fn>(
+ dlsym(handle_, "mosquitto_disconnect"));
+ if (!disconnect_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ subscribe_ = reinterpret_cast<subscribe_fn>(
+ dlsym(handle_, "mosquitto_subscribe"));
+ if (!subscribe_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ unsubscribe_ = reinterpret_cast<unsubscribe_fn>(
+ dlsym(handle_, "mosquitto_unsubscribe"));
+ if (!unsubscribe_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ publish_ = reinterpret_cast<publish_fn>(dlsym(handle_, "mosquitto_publish"));
+ if (!publish_) {
+ LOG(ERROR) << "Failed to load symbol";
+ return false;
+ }
+
+ return true;
+}
+
MqttService::MqttService() {
- mosq_ = mosquitto_new(nullptr, true, this);
+ mqtt_lib_ = std::make_unique<MqttLibraryAdapter>();
+ mosq_ = mqtt_lib_->mosquitto_new(nullptr, true, this);
if (mosq_ == nullptr)
THROW(error::MQTT_PLUGIN_ERROR_OUT_OF_MEMORY);
// consider using v5?
- mosquitto_connect_callback_set(mosq_,
- [](mosquitto* mosq, void* obj, int rc) {
+ mqtt_lib_->mosquitto_connect_callback_set(mosq_,
+ [](void* mosq, void* obj, int rc) {
LOG(ERROR) << "ON CONNECT";
});
- mosquitto_disconnect_callback_set(mosq_,
- [](mosquitto* mosq, void* obj, int rc) {
+ mqtt_lib_->mosquitto_disconnect_callback_set(mosq_,
+ [](void* mosq, void* obj, int rc) {
LOG(ERROR) << "ON DISCONNECT";
});
- mosquitto_message_callback_set(mosq_,
- [](mosquitto* mosq, void* obj, const struct mosquitto_message* message){
+ mqtt_lib_->mosquitto_message_callback_set(mosq_,
+ [](void* mosq, void* obj, const struct MqttLibraryAdapter::mosquitto_message* message){
LOG(ERROR) << "ON MESSAGE";
MqttService* service = static_cast<MqttService*>(obj);
MqttService::~MqttService() {
LOG(ERROR) << "~MqttService";
if (mosq_)
- mosquitto_destroy(mosq_);
+ mqtt_lib_->mosquitto_destroy(mosq_);
g_main_loop_quit(loop_);
thread_.join();
void MqttService::Connect(const std::string &broker_ip, int broker_port,
const std::string &user_name, const std::string &password) {
- int ret = mosquitto_username_pw_set(mosq_, user_name.c_str(),
+ int ret = mqtt_lib_->mosquitto_username_pw_set(mosq_, user_name.c_str(),
password.c_str());
- if (ret != MOSQ_ERR_SUCCESS) {
+ if (ret != 0) {
LOG(ERROR) << "Failed to set username and password: " << ret;
THROW(ret);
}
- ret = mosquitto_loop_start(mosq_);
- if (ret != MOSQ_ERR_SUCCESS) {
+ ret = mqtt_lib_->mosquitto_loop_start(mosq_);
+ if (ret != 0) {
LOG(ERROR) << "Failed to start mosquitto loop: " << ret;
THROW(ret);
}
// same as mqtt
int keep_alive = 60;
- ret = mosquitto_connect(mosq_, broker_ip.c_str(), broker_port, keep_alive);
- if (ret != MOSQ_ERR_SUCCESS) {
+ ret = mqtt_lib_->mosquitto_connect(mosq_, broker_ip.c_str(), broker_port,
+ keep_alive);
+ if (ret != 0) {
LOG(ERROR) << "Failed to connect broker: " << ret;
THROW(ret);
}
const unsigned char* data, size_t data_size) {
int mid = -1;
int qos = 2;
- int ret = mosquitto_publish(mosq_, &mid, topic.c_str(), data_size,
+ int ret = mqtt_lib_->mosquitto_publish(mosq_, &mid, topic.c_str(), data_size,
static_cast<const void*>(data), qos, false);
- if (ret != MOSQ_ERR_SUCCESS) {
+ if (ret != 0) {
LOG(ERROR) << "Failed to publish: " << ret;
THROW(ret);
}
void *user_data) {
int mid = -1;
int qos = 2;
- int ret = mosquitto_subscribe(mosq_, &mid, topic.c_str(), qos);
- if (ret != MOSQ_ERR_SUCCESS) {
+ int ret = mqtt_lib_->mosquitto_subscribe(mosq_, &mid, topic.c_str(), qos);
+ if (ret != 0) {
LOG(ERROR) << "Failed to subscribe: " << ret;
THROW(ret);
}
int MqttService::Unsubscribe(const std::string& topic) {
int mid = -1;
- return mosquitto_unsubscribe(mosq_, &mid, topic.c_str());
+ return mqtt_lib_->mosquitto_unsubscribe(mosq_, &mid, topic.c_str());
}
bool MqttService::Disconnect() {
peer_watchdog_.reset();
peer_dp_.reset();
- int ret = mosquitto_disconnect(mosq_);
+ int ret = mqtt_lib_->mosquitto_disconnect(mosq_);
if (ret) {
LOG(ERROR) << "Failed to disconnect broker: " << ret;
return false;
#include <condition_variable>
#include <thread>
#include <gio/gio.h>
-#include <mosquitto.h>
#include <unordered_map>
#include <memory>
int Unsubscribe(const std::string& topic);
private:
+ class MqttLibraryAdapter {
+ public:
+ struct mosquitto_message {
+ int mid;
+ char* topic;
+ void* payload;
+ int payloadlen;
+ int qos;
+ bool retain;
+ };
+ using new_fn = void*(*)(const char*, bool, void*);
+ using destroy_fn = void(*)(void*);
+ using connnect_callback_fn = void(*)(void*, void*, int);
+ using connect_callback_set_fn = void(*)(void*, connnect_callback_fn);
+ using disconnect_callback_set_fn = void(*)(void*, connnect_callback_fn);
+ using message_callback_fn = void(*)(void*, void*,
+ const struct mosquitto_message*);
+ using message_callback_set_fn = void(*)(void*, message_callback_fn);
+ using username_pw_set_fn = int(*)(void*, const char*, const char*);
+ using loop_start_fn = int(*)(void*);
+ using connect_fn = int(*)(void*, const char*, int ,int);
+ using disconnect_fn = int(*)(void*);
+ using subscribe_fn = int(*)(void*, int*, const char*, int qos);
+ using unsubscribe_fn = int(*)(void*, int*, const char*);
+ using publish_fn = int(*)(void*, int*, const char*, int, const void*, int,
+ bool);
+
+ MqttLibraryAdapter();
+ ~MqttLibraryAdapter();
+ void* mosquitto_new(const char* id, bool clean_session, void* obj) {
+ return new_(id, clean_session, obj);
+ }
+ void mosquitto_destroy(void* mosq) {
+ destroy_(mosq);
+ }
+ void mosquitto_connect_callback_set(void* mosq, connnect_callback_fn cb) {
+ connect_callback_set_(mosq, cb);
+ }
+ void mosquitto_disconnect_callback_set(void* mosq,
+ connnect_callback_fn cb) {
+ disconnect_callback_set_(mosq, cb);
+ }
+ void mosquitto_message_callback_set(void* mosq, message_callback_fn cb) {
+ message_callback_set_(mosq, cb);
+ }
+ int mosquitto_username_pw_set(void* mosq, const char* username,
+ const char* password) {
+ return username_pw_set_(mosq, username, password);
+ }
+ int mosquitto_loop_start(void* mosq) {
+ return loop_start_(mosq);
+ }
+ int mosquitto_connect(void* mosq, const char* host, int port,
+ int keepalive) {
+ return connect_(mosq, host, port, keepalive);
+ }
+ int mosquitto_disconnect(void* mosq) {
+ return disconnect_(mosq);
+ }
+ int mosquitto_subscribe(void* mosq, int* mid, const char* sub, int qos) {
+ return subscribe_(mosq, mid, sub, qos);
+ }
+ int mosquitto_unsubscribe(void* mosq, int* mid, const char* sub) {
+ return unsubscribe_(mosq, mid, sub);
+ }
+ int mosquitto_publish(void* mosq, int* mid, const char* topic,
+ int payloadlen, const void* payload, int qos, bool retain) {
+ return publish_(mosq, mid, topic, payloadlen, payload, qos, retain);
+ }
+
+ private:
+ bool LoadSymbols();
+
+ void* handle_;
+ new_fn new_;
+ destroy_fn destroy_;
+ connect_callback_set_fn connect_callback_set_;
+ disconnect_callback_set_fn disconnect_callback_set_;
+ message_callback_set_fn message_callback_set_;
+ username_pw_set_fn username_pw_set_;
+ loop_start_fn loop_start_;
+ connect_fn connect_;
+ disconnect_fn disconnect_;
+ subscribe_fn subscribe_;
+ unsubscribe_fn unsubscribe_;
+ publish_fn publish_;
+ };
+
struct SubscribeData {
SubscribeData(mqtt_plugin_sub_fn cb, void* user_data);
mqtt_plugin_sub_fn cb;
void* user_data;
};
- struct mosquitto* mosq_;
+ std::unique_ptr<MqttLibraryAdapter> mqtt_lib_;
+ void* mosq_;
std::string iface_name_;
std::thread thread_;