57c6daa45c5466811d865c13122b82be1c56353a
[platform/core/ml/aitt.git] / src / MosquittoMQ.cc
1 /*
2  * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "MosquittoMQ.h"
17
18 #include <mqtt_protocol.h>
19 #include <sys/types.h>
20 #include <unistd.h>
21
22 #include <algorithm>
23 #include <cerrno>
24 #include <stdexcept>
25 #include <thread>
26
27 #include "AittException.h"
28 #include "AittTypes.h"
29 #include "aitt_internal.h"
30
31 namespace aitt {
32
33 const std::string MosquittoMQ::REPLY_SEQUENCE_NUM_KEY = "sequenceNum";
34 const std::string MosquittoMQ::REPLY_IS_END_SEQUENCE_KEY = "isEndSequence";
35
36 MosquittoMQ::MosquittoMQ(const std::string &id, bool clear_session)
37       : handle(nullptr),
38         keep_alive(60),
39         subscribers_iterating(false),
40         subscriber_iterator_updated(false),
41         connect_cb(nullptr)
42 {
43     do {
44         int ret = mosquitto_lib_init();
45         if (ret != MOSQ_ERR_SUCCESS) {
46             ERR("mosquitto_lib_init() Fail(%s)", mosquitto_strerror(ret));
47             break;
48         }
49
50         handle = mosquitto_new(id.c_str(), clear_session, this);
51         if (handle == nullptr) {
52             ERR("mosquitto_new(%s, %d) Fail", id.c_str(), clear_session);
53             break;
54         }
55
56         ret = mosquitto_int_option(handle, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
57         if (ret != MOSQ_ERR_SUCCESS) {
58             ERR("mosquitto_int_option() Fail(%s)", mosquitto_strerror(ret));
59             break;
60         }
61
62         mosquitto_message_v5_callback_set(handle, MessageCallback);
63         mosquitto_connect_v5_callback_set(handle, ConnectCallback);
64         mosquitto_disconnect_v5_callback_set(handle, DisconnectCallback);
65
66         return;
67     } while (0);
68
69     mosquitto_destroy(handle);
70     mosquitto_lib_cleanup();
71     throw AittException(AittException::MQTT_ERR, std::string("MosquittoMQ Constructor Error"));
72 }
73
74 MosquittoMQ::~MosquittoMQ(void)
75 {
76     int ret;
77     INFO("Destructor");
78
79     callback_lock.lock();
80     connect_cb = nullptr;
81     subscribers.clear();
82     callback_lock.unlock();
83
84     mosquitto_destroy(handle);
85
86     ret = mosquitto_lib_cleanup();
87     if (ret != MOSQ_ERR_SUCCESS)
88         ERR("mosquitto_lib_cleanup() Fail(%s)", mosquitto_strerror(ret));
89 }
90
91 void MosquittoMQ::SetConnectionCallback(const MQConnectionCallback &cb)
92 {
93     std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
94     connect_cb = cb;
95 }
96
97 void MosquittoMQ::ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
98       const mosquitto_property *props)
99 {
100     RET_IF(obj == nullptr);
101     MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
102
103     INFO("Connected : rc(%d), flag(%d)", rc, flag);
104
105     std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
106     if (mq->connect_cb)
107         mq->connect_cb((rc == CONNACK_ACCEPTED) ? AITT_CONNECTED : AITT_CONNECT_FAILED);
108 }
109
110 void MosquittoMQ::DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
111       const mosquitto_property *props)
112 {
113     RET_IF(obj == nullptr);
114     MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
115
116     INFO("Disconnected : rc(%d)", rc);
117
118     std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
119     if (mq->connect_cb)
120         mq->connect_cb(AITT_DISCONNECTED);
121 }
122
123 void MosquittoMQ::Connect(const std::string &host, int port, const std::string &username,
124       const std::string &password)
125 {
126     int ret;
127
128     if (username.empty() == false) {
129         ret = mosquitto_username_pw_set(handle, username.c_str(), password.c_str());
130         if (ret != MOSQ_ERR_SUCCESS) {
131             ERR("mosquitto_username_pw_set(%s, %s) Fail(%s)", username.c_str(), password.c_str(),
132                   mosquitto_strerror(ret));
133             throw AittException(AittException::MQTT_ERR);
134         }
135     }
136
137     ret = mosquitto_loop_start(handle);
138     if (ret != MOSQ_ERR_SUCCESS) {
139         ERR("mosquitto_loop_start() Fail(%s)", mosquitto_strerror(ret));
140         throw AittException(AittException::MQTT_ERR);
141     }
142
143     ret = mosquitto_connect(handle, host.c_str(), port, keep_alive);
144     if (ret != MOSQ_ERR_SUCCESS) {
145         ERR("mosquitto_connect(%s, %d) Fail(%s)", host.c_str(), port, mosquitto_strerror(ret));
146         throw AittException(AittException::MQTT_ERR);
147     }
148 }
149
150 void MosquittoMQ::SetWillInfo(const std::string &topic, const void *msg, int szmsg, int qos,
151       bool retain)
152 {
153     int ret = mosquitto_will_set(handle, topic.c_str(), szmsg, msg, qos, retain);
154     if (ret != MOSQ_ERR_SUCCESS) {
155         ERR("mosquitto_will_set(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
156         throw AittException(AittException::MQTT_ERR);
157     }
158 }
159
160 void MosquittoMQ::Disconnect(void)
161 {
162     int ret = mosquitto_disconnect(handle);
163     if (ret != MOSQ_ERR_SUCCESS) {
164         ERR("mosquitto_disconnect() Fail(%s)", mosquitto_strerror(ret));
165         throw AittException(AittException::MQTT_ERR);
166     }
167
168     ret = mosquitto_loop_stop(handle, false);
169     if (ret != MOSQ_ERR_SUCCESS)
170         ERR("mosquitto_loop_stop() Fail(%s)", mosquitto_strerror(ret));
171
172     mosquitto_will_clear(handle);
173 }
174
175 void MosquittoMQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *msg,
176       const mosquitto_property *props)
177 {
178     RET_IF(obj == nullptr);
179     MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
180
181     mq->MessageCB(msg, props);
182 }
183
184 void MosquittoMQ::MessageCB(const mosquitto_message *msg, const mosquitto_property *props)
185 {
186     std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
187     subscribers_iterating = true;
188     subscriber_iterator = subscribers.begin();
189     while (subscriber_iterator != subscribers.end()) {
190         auto subscribe_data = *(subscriber_iterator);
191         if (nullptr == subscribe_data) {
192             ERR("Invalid subscribe data");
193             ++subscriber_iterator;
194             continue;
195         }
196
197         if (CompareTopic(subscribe_data->topic.c_str(), msg->topic))
198             InvokeCallback(*subscriber_iterator, msg, props);
199
200         if (!subscriber_iterator_updated)
201             ++subscriber_iterator;
202         else
203             subscriber_iterator_updated = false;
204     }
205     subscribers_iterating = false;
206     subscribers.insert(subscribers.end(), new_subscribers.begin(), new_subscribers.end());
207     new_subscribers.clear();
208 }
209
210 void MosquittoMQ::InvokeCallback(SubscribeData *subscriber, const mosquitto_message *msg,
211       const mosquitto_property *props)
212 {
213     RET_IF(nullptr == subscriber);
214
215     MSG mq_msg;
216     mq_msg.SetTopic(msg->topic);
217     if (props) {
218         const mosquitto_property *prop;
219
220         char *response_topic = nullptr;
221         prop = mosquitto_property_read_string(props, MQTT_PROP_RESPONSE_TOPIC, &response_topic,
222               false);
223         if (prop) {
224             mq_msg.SetResponseTopic(response_topic);
225             free(response_topic);
226         }
227
228         void *correlation = nullptr;
229         uint16_t correlation_size = 0;
230         prop = mosquitto_property_read_binary(props, MQTT_PROP_CORRELATION_DATA, &correlation,
231               &correlation_size, false);
232         if (prop == nullptr || correlation == nullptr)
233             ERR("No Correlation Data");
234
235         mq_msg.SetCorrelation(std::string((char *)correlation, correlation_size));
236         if (correlation)
237             free(correlation);
238
239         char *name = nullptr;
240         char *value = nullptr;
241         prop = mosquitto_property_read_string_pair(props, MQTT_PROP_USER_PROPERTY, &name, &value,
242               false);
243         while (prop) {
244             if (REPLY_SEQUENCE_NUM_KEY == name) {
245                 mq_msg.SetSequence(std::stoi(value));
246             } else if (REPLY_IS_END_SEQUENCE_KEY == name) {
247                 mq_msg.SetEndSequence(std::stoi(value) == 1);
248             } else {
249                 ERR("Unsupported property(%s, %s)", name, value);
250             }
251             free(name);
252             free(value);
253
254             prop = mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &name, &value,
255                   true);
256         }
257     }
258
259     subscriber->cb(&mq_msg, msg->topic, msg->payload, msg->payloadlen, subscriber->user_data);
260 }
261
262 void MosquittoMQ::Publish(const std::string &topic, const void *data, const int datalen, int qos,
263       bool retain)
264 {
265     int mid = -1;
266     int ret = mosquitto_publish(handle, &mid, topic.c_str(), datalen, data, qos, retain);
267     if (ret != MOSQ_ERR_SUCCESS) {
268         ERR("mosquitto_publish(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
269         throw AittException(AittException::MQTT_ERR);
270     }
271 }
272
273 void MosquittoMQ::PublishWithReply(const std::string &topic, const void *data, const int datalen,
274       int qos, bool retain, const std::string &reply_topic, const std::string &correlation)
275 {
276     int ret;
277     int mid = -1;
278     mosquitto_property *props = nullptr;
279
280     ret = mosquitto_property_add_string(&props, MQTT_PROP_RESPONSE_TOPIC, reply_topic.c_str());
281     if (ret != MOSQ_ERR_SUCCESS) {
282         ERR("mosquitto_property_add_string(response-topic) Fail(%s)", mosquitto_strerror(ret));
283         throw AittException(AittException::MQTT_ERR);
284     }
285
286     ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA, correlation.c_str(),
287           correlation.size());
288     if (ret != MOSQ_ERR_SUCCESS) {
289         ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
290         throw AittException(AittException::MQTT_ERR);
291     }
292     ret = mosquitto_publish_v5(handle, &mid, topic.c_str(), datalen, data, qos, retain, props);
293     if (ret != MOSQ_ERR_SUCCESS) {
294         ERR("mosquitto_publish_v5(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
295         throw AittException(AittException::MQTT_ERR);
296     }
297 }
298
299 void MosquittoMQ::SendReply(MSG *msg, const void *data, const int datalen, int qos, bool retain)
300 {
301     RET_IF(msg == nullptr);
302
303     int ret;
304     int mId = -1;
305     mosquitto_property *props = nullptr;
306
307     ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA,
308           msg->GetCorrelation().c_str(), msg->GetCorrelation().size());
309     if (ret != MOSQ_ERR_SUCCESS) {
310         ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
311         throw AittException(AittException::MQTT_ERR);
312     }
313
314     ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
315           REPLY_SEQUENCE_NUM_KEY.c_str(), std::to_string(msg->GetSequence()).c_str());
316     if (ret != MOSQ_ERR_SUCCESS) {
317         ERR("mosquitto_property_add_string_pair(squenceNum) Fail(%s)", mosquitto_strerror(ret));
318         throw AittException(AittException::MQTT_ERR);
319     }
320
321     ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
322           REPLY_IS_END_SEQUENCE_KEY.c_str(), std::to_string(msg->IsEndSequence()).c_str());
323     if (ret != MOSQ_ERR_SUCCESS) {
324         ERR("mosquitto_property_add_string_pair(IsEndSequence) Fail(%s)", mosquitto_strerror(ret));
325         throw AittException(AittException::MQTT_ERR);
326     }
327
328     ret = mosquitto_publish_v5(handle, &mId, msg->GetResponseTopic().c_str(), datalen, data, qos,
329           retain, props);
330     if (ret != MOSQ_ERR_SUCCESS) {
331         ERR("mosquitto_publish_v5(%s) Fail(%s)", msg->GetResponseTopic().c_str(),
332               mosquitto_strerror(ret));
333         throw AittException(AittException::MQTT_ERR);
334     }
335 }
336
337 void *MosquittoMQ::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data,
338       int qos)
339 {
340     int mid = -1;
341     int ret = mosquitto_subscribe(handle, &mid, topic.c_str(), qos);
342     if (ret != MOSQ_ERR_SUCCESS) {
343         ERR("mosquitto_subscribe(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
344         throw AittException(AittException::MQTT_ERR);
345     }
346
347     std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
348     SubscribeData *data = new SubscribeData(topic, cb, user_data);
349     if (subscribers_iterating)
350         new_subscribers.push_back(data);
351     else
352         subscribers.push_back(data);
353
354     return static_cast<void *>(data);
355 }
356
357 void *MosquittoMQ::Unsubscribe(void *sub_handle)
358 {
359     RETV_IF(nullptr == sub_handle, nullptr);
360
361     std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
362     auto it = std::find(subscribers.begin(), subscribers.end(),
363           static_cast<SubscribeData *>(sub_handle));
364
365     if (it == subscribers.end()) {
366         ERR("No Subscription(%p)", sub_handle);
367         throw AittException(AittException::NO_DATA_ERR);
368     }
369
370     SubscribeData *data = static_cast<SubscribeData *>(sub_handle);
371
372     if (subscriber_iterator == it) {
373         subscriber_iterator = subscribers.erase(it);
374         subscriber_iterator_updated = true;
375     } else {
376         subscribers.erase(it);
377     }
378
379     void *user_data = data->user_data;
380     std::string topic = data->topic;
381     delete data;
382
383     int mid = -1;
384     int ret = mosquitto_unsubscribe(handle, &mid, topic.c_str());
385     if (ret != MOSQ_ERR_SUCCESS) {
386         ERR("mosquitto_unsubscribe(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
387         throw AittException(AittException::MQTT_ERR);
388     }
389
390     return user_data;
391 }
392
393 bool MosquittoMQ::CompareTopic(const std::string &left, const std::string &right)
394 {
395     bool result = false;
396     int ret = mosquitto_topic_matches_sub(left.c_str(), right.c_str(), &result);
397     if (ret != MOSQ_ERR_SUCCESS) {
398         ERR("mosquitto_topic_matches_sub(%s, %s) Fail(%s)", left.c_str(), right.c_str(),
399               mosquitto_strerror(ret));
400         return false;
401     }
402     return result;
403 }
404
405 MosquittoMQ::SubscribeData::SubscribeData(const std::string &in_topic,
406       const SubscribeCallback &in_cb, void *in_user_data)
407       : topic(in_topic), cb(in_cb), user_data(in_user_data)
408 {
409 }
410
411 }  // namespace aitt