2 * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include "MosquittoMQ.h"
18 #include <mqtt_protocol.h>
19 #include <sys/types.h>
27 #include "AittException.h"
28 #include "AittTypes.h"
29 #include "aitt_internal.h"
33 const std::string MosquittoMQ::REPLY_SEQUENCE_NUM_KEY = "sequenceNum";
34 const std::string MosquittoMQ::REPLY_IS_END_SEQUENCE_KEY = "isEndSequence";
36 MosquittoMQ::MosquittoMQ(const std::string &id, bool clear_session)
39 subscribers_iterating(false),
40 subscriber_iterator_updated(false),
44 int ret = mosquitto_lib_init();
45 if (ret != MOSQ_ERR_SUCCESS) {
46 ERR("mosquitto_lib_init() Fail(%s)", mosquitto_strerror(ret));
50 handle = mosquitto_new(id.c_str(), clear_session, this);
51 if (handle == nullptr) {
52 ERR("mosquitto_new(%s, %d) Fail", id.c_str(), clear_session);
56 ret = mosquitto_int_option(handle, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
57 if (ret != MOSQ_ERR_SUCCESS) {
58 ERR("mosquitto_int_option() Fail(%s)", mosquitto_strerror(ret));
62 mosquitto_message_v5_callback_set(handle, MessageCallback);
63 mosquitto_connect_v5_callback_set(handle, ConnectCallback);
64 mosquitto_disconnect_v5_callback_set(handle, DisconnectCallback);
69 mosquitto_destroy(handle);
70 mosquitto_lib_cleanup();
71 throw AittException(AittException::MQTT_ERR, std::string("MosquittoMQ Constructor Error"));
74 MosquittoMQ::~MosquittoMQ(void)
82 callback_lock.unlock();
84 mosquitto_destroy(handle);
86 ret = mosquitto_lib_cleanup();
87 if (ret != MOSQ_ERR_SUCCESS)
88 ERR("mosquitto_lib_cleanup() Fail(%s)", mosquitto_strerror(ret));
91 void MosquittoMQ::SetConnectionCallback(const MQConnectionCallback &cb)
93 std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
97 void MosquittoMQ::ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
98 const mosquitto_property *props)
100 RET_IF(obj == nullptr);
101 MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
103 INFO("Connected : rc(%d), flag(%d)", rc, flag);
105 std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
107 mq->connect_cb((rc == CONNACK_ACCEPTED) ? AITT_CONNECTED : AITT_CONNECT_FAILED);
110 void MosquittoMQ::DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
111 const mosquitto_property *props)
113 RET_IF(obj == nullptr);
114 MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
116 INFO("Disconnected : rc(%d)", rc);
118 std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
120 mq->connect_cb(AITT_DISCONNECTED);
123 void MosquittoMQ::Connect(const std::string &host, int port, const std::string &username,
124 const std::string &password)
128 if (username.empty() == false) {
129 ret = mosquitto_username_pw_set(handle, username.c_str(), password.c_str());
130 if (ret != MOSQ_ERR_SUCCESS) {
131 ERR("mosquitto_username_pw_set(%s, %s) Fail(%s)", username.c_str(), password.c_str(),
132 mosquitto_strerror(ret));
133 throw AittException(AittException::MQTT_ERR);
137 ret = mosquitto_loop_start(handle);
138 if (ret != MOSQ_ERR_SUCCESS) {
139 ERR("mosquitto_loop_start() Fail(%s)", mosquitto_strerror(ret));
140 throw AittException(AittException::MQTT_ERR);
143 ret = mosquitto_connect(handle, host.c_str(), port, keep_alive);
144 if (ret != MOSQ_ERR_SUCCESS) {
145 ERR("mosquitto_connect(%s, %d) Fail(%s)", host.c_str(), port, mosquitto_strerror(ret));
146 throw AittException(AittException::MQTT_ERR);
150 void MosquittoMQ::SetWillInfo(const std::string &topic, const void *msg, int szmsg, int qos,
153 int ret = mosquitto_will_set(handle, topic.c_str(), szmsg, msg, qos, retain);
154 if (ret != MOSQ_ERR_SUCCESS) {
155 ERR("mosquitto_will_set(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
156 throw AittException(AittException::MQTT_ERR);
160 void MosquittoMQ::Disconnect(void)
162 int ret = mosquitto_disconnect(handle);
163 if (ret != MOSQ_ERR_SUCCESS) {
164 ERR("mosquitto_disconnect() Fail(%s)", mosquitto_strerror(ret));
165 throw AittException(AittException::MQTT_ERR);
168 ret = mosquitto_loop_stop(handle, false);
169 if (ret != MOSQ_ERR_SUCCESS)
170 ERR("mosquitto_loop_stop() Fail(%s)", mosquitto_strerror(ret));
172 mosquitto_will_clear(handle);
175 void MosquittoMQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *msg,
176 const mosquitto_property *props)
178 RET_IF(obj == nullptr);
179 MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
181 mq->MessageCB(msg, props);
184 void MosquittoMQ::MessageCB(const mosquitto_message *msg, const mosquitto_property *props)
186 std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
187 subscribers_iterating = true;
188 subscriber_iterator = subscribers.begin();
189 while (subscriber_iterator != subscribers.end()) {
190 auto subscribe_data = *(subscriber_iterator);
191 if (nullptr == subscribe_data) {
192 ERR("Invalid subscribe data");
193 ++subscriber_iterator;
197 if (CompareTopic(subscribe_data->topic.c_str(), msg->topic))
198 InvokeCallback(*subscriber_iterator, msg, props);
200 if (!subscriber_iterator_updated)
201 ++subscriber_iterator;
203 subscriber_iterator_updated = false;
205 subscribers_iterating = false;
206 subscribers.insert(subscribers.end(), new_subscribers.begin(), new_subscribers.end());
207 new_subscribers.clear();
210 void MosquittoMQ::InvokeCallback(SubscribeData *subscriber, const mosquitto_message *msg,
211 const mosquitto_property *props)
213 RET_IF(nullptr == subscriber);
216 mq_msg.SetTopic(msg->topic);
218 const mosquitto_property *prop;
220 char *response_topic = nullptr;
221 prop = mosquitto_property_read_string(props, MQTT_PROP_RESPONSE_TOPIC, &response_topic,
224 mq_msg.SetResponseTopic(response_topic);
225 free(response_topic);
228 void *correlation = nullptr;
229 uint16_t correlation_size = 0;
230 prop = mosquitto_property_read_binary(props, MQTT_PROP_CORRELATION_DATA, &correlation,
231 &correlation_size, false);
232 if (prop == nullptr || correlation == nullptr)
233 ERR("No Correlation Data");
235 mq_msg.SetCorrelation(std::string((char *)correlation, correlation_size));
239 char *name = nullptr;
240 char *value = nullptr;
241 prop = mosquitto_property_read_string_pair(props, MQTT_PROP_USER_PROPERTY, &name, &value,
244 if (REPLY_SEQUENCE_NUM_KEY == name) {
245 mq_msg.SetSequence(std::stoi(value));
246 } else if (REPLY_IS_END_SEQUENCE_KEY == name) {
247 mq_msg.SetEndSequence(std::stoi(value) == 1);
249 ERR("Unsupported property(%s, %s)", name, value);
254 prop = mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &name, &value,
259 subscriber->cb(&mq_msg, msg->topic, msg->payload, msg->payloadlen, subscriber->user_data);
262 void MosquittoMQ::Publish(const std::string &topic, const void *data, const int datalen, int qos,
266 int ret = mosquitto_publish(handle, &mid, topic.c_str(), datalen, data, qos, retain);
267 if (ret != MOSQ_ERR_SUCCESS) {
268 ERR("mosquitto_publish(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
269 throw AittException(AittException::MQTT_ERR);
273 void MosquittoMQ::PublishWithReply(const std::string &topic, const void *data, const int datalen,
274 int qos, bool retain, const std::string &reply_topic, const std::string &correlation)
278 mosquitto_property *props = nullptr;
280 ret = mosquitto_property_add_string(&props, MQTT_PROP_RESPONSE_TOPIC, reply_topic.c_str());
281 if (ret != MOSQ_ERR_SUCCESS) {
282 ERR("mosquitto_property_add_string(response-topic) Fail(%s)", mosquitto_strerror(ret));
283 throw AittException(AittException::MQTT_ERR);
286 ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA, correlation.c_str(),
288 if (ret != MOSQ_ERR_SUCCESS) {
289 ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
290 throw AittException(AittException::MQTT_ERR);
292 ret = mosquitto_publish_v5(handle, &mid, topic.c_str(), datalen, data, qos, retain, props);
293 if (ret != MOSQ_ERR_SUCCESS) {
294 ERR("mosquitto_publish_v5(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
295 throw AittException(AittException::MQTT_ERR);
299 void MosquittoMQ::SendReply(MSG *msg, const void *data, const int datalen, int qos, bool retain)
301 RET_IF(msg == nullptr);
305 mosquitto_property *props = nullptr;
307 ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA,
308 msg->GetCorrelation().c_str(), msg->GetCorrelation().size());
309 if (ret != MOSQ_ERR_SUCCESS) {
310 ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
311 throw AittException(AittException::MQTT_ERR);
314 ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
315 REPLY_SEQUENCE_NUM_KEY.c_str(), std::to_string(msg->GetSequence()).c_str());
316 if (ret != MOSQ_ERR_SUCCESS) {
317 ERR("mosquitto_property_add_string_pair(squenceNum) Fail(%s)", mosquitto_strerror(ret));
318 throw AittException(AittException::MQTT_ERR);
321 ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
322 REPLY_IS_END_SEQUENCE_KEY.c_str(), std::to_string(msg->IsEndSequence()).c_str());
323 if (ret != MOSQ_ERR_SUCCESS) {
324 ERR("mosquitto_property_add_string_pair(IsEndSequence) Fail(%s)", mosquitto_strerror(ret));
325 throw AittException(AittException::MQTT_ERR);
328 ret = mosquitto_publish_v5(handle, &mId, msg->GetResponseTopic().c_str(), datalen, data, qos,
330 if (ret != MOSQ_ERR_SUCCESS) {
331 ERR("mosquitto_publish_v5(%s) Fail(%s)", msg->GetResponseTopic().c_str(),
332 mosquitto_strerror(ret));
333 throw AittException(AittException::MQTT_ERR);
337 void *MosquittoMQ::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data,
341 int ret = mosquitto_subscribe(handle, &mid, topic.c_str(), qos);
342 if (ret != MOSQ_ERR_SUCCESS) {
343 ERR("mosquitto_subscribe(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
344 throw AittException(AittException::MQTT_ERR);
347 std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
348 SubscribeData *data = new SubscribeData(topic, cb, user_data);
349 if (subscribers_iterating)
350 new_subscribers.push_back(data);
352 subscribers.push_back(data);
354 return static_cast<void *>(data);
357 void *MosquittoMQ::Unsubscribe(void *sub_handle)
359 RETV_IF(nullptr == sub_handle, nullptr);
361 std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
362 auto it = std::find(subscribers.begin(), subscribers.end(),
363 static_cast<SubscribeData *>(sub_handle));
365 if (it == subscribers.end()) {
366 ERR("No Subscription(%p)", sub_handle);
367 throw AittException(AittException::NO_DATA_ERR);
370 SubscribeData *data = static_cast<SubscribeData *>(sub_handle);
372 if (subscriber_iterator == it) {
373 subscriber_iterator = subscribers.erase(it);
374 subscriber_iterator_updated = true;
376 subscribers.erase(it);
379 void *user_data = data->user_data;
380 std::string topic = data->topic;
384 int ret = mosquitto_unsubscribe(handle, &mid, topic.c_str());
385 if (ret != MOSQ_ERR_SUCCESS) {
386 ERR("mosquitto_unsubscribe(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
387 throw AittException(AittException::MQTT_ERR);
393 bool MosquittoMQ::CompareTopic(const std::string &left, const std::string &right)
396 int ret = mosquitto_topic_matches_sub(left.c_str(), right.c_str(), &result);
397 if (ret != MOSQ_ERR_SUCCESS) {
398 ERR("mosquitto_topic_matches_sub(%s, %s) Fail(%s)", left.c_str(), right.c_str(),
399 mosquitto_strerror(ret));
405 MosquittoMQ::SubscribeData::SubscribeData(const std::string &in_topic,
406 const SubscribeCallback &in_cb, void *in_user_data)
407 : topic(in_topic), cb(in_cb), user_data(in_user_data)