2 * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 #include "MosquittoMQ.h"
26 #include "aitt_internal.h"
30 AITT::Impl::Impl(AITT &parent, const std::string &id, const std::string &my_ip,
31 const AittOption &option)
34 modules(my_ip, discovery),
39 if (option.GetUseCustomMqttBroker()) {
40 mq = modules.NewCustomMQ(id, option);
41 AittOption discovery_option = option;
42 discovery_option.SetCleanSession(false);
43 discovery.SetMQ(modules.NewCustomMQ(id + 'd', option));
45 mq = std::unique_ptr<MQ>(new MosquittoMQ(id, option.GetCleanSession()));
46 discovery.SetMQ(std::unique_ptr<MQ>(new MosquittoMQ(id + 'd', false)));
48 aittThread = std::thread(&AITT::Impl::ThreadMain, this);
51 AITT::Impl::~Impl(void)
53 if (mqtt_broker_ip_.empty() == false) {
56 } catch (std::exception &e) {
57 ERR("Disconnect() Fail(%s)", e.what());
60 while (main_loop.Quit() == false) {
61 // wait when called before the thread has completely created.
62 usleep(1000); // 1millisecond
65 if (aittThread.joinable())
68 discovery.SetMQ(nullptr);
72 void AITT::Impl::ThreadMain(void)
74 pthread_setname_np(pthread_self(), "AITTWorkerLoop");
78 void AITT::Impl::SetWillInfo(const std::string &topic, const void *data, const int datalen,
79 AittQoS qos, bool retain)
81 mq->SetWillInfo(topic, data, datalen, qos, retain);
84 void AITT::Impl::SetConnectionCallback(ConnectionCallback cb, void *user_data)
87 mq->SetConnectionCallback([&, cb, user_data](int status) {
88 auto idler_cb = std::bind(&Impl::ConnectionCB, this, cb, user_data, status,
89 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
90 MainLoopHandler::AddIdle(&main_loop, idler_cb, nullptr);
93 mq->SetConnectionCallback(nullptr);
97 int AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status,
98 MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *loop_data)
100 RETV_IF(cb == nullptr, AITT_LOOP_EVENT_REMOVE);
102 cb(public_api, status, user_data);
104 return AITT_LOOP_EVENT_REMOVE;
107 void AITT::Impl::Connect(const std::string &host, int port, const std::string &username,
108 const std::string &password)
110 discovery.Start(host, port, username, password);
111 mq->Connect(host, port, username, password);
113 mqtt_broker_ip_ = host;
114 mqtt_broker_port_ = port;
117 void AITT::Impl::Disconnect(void)
121 for (auto stream : in_use_streams)
123 in_use_streams.clear();
125 mqtt_broker_ip_.clear();
126 mqtt_broker_port_ = -1;
132 void AITT::Impl::UnsubscribeAll()
134 std::unique_lock<std::mutex> lock(subscribed_list_mutex_);
136 DBG("Subscribed list %zu", subscribed_list.size());
138 for (auto subscribe_info : subscribed_list) {
139 switch (subscribe_info->first) {
141 mq->Unsubscribe(subscribe_info->second);
144 case AITT_TYPE_TCP_SECURE:
145 modules.Get(subscribe_info->first).Unsubscribe(subscribe_info->second);
149 ERR("Unknown AittProtocol(%d)", subscribe_info->first);
153 delete subscribe_info;
155 subscribed_list.clear();
158 void AITT::Impl::ConfigureTransportModule(const std::string &key, const std::string &value,
159 AittProtocol protocols)
163 void AITT::Impl::Publish(const std::string &topic, const void *data, const int datalen,
164 AittProtocol protocols, AittQoS qos, bool retain)
166 if ((protocols & AITT_TYPE_MQTT) == AITT_TYPE_MQTT)
167 mq->Publish(topic, data, datalen, qos, retain);
169 if ((protocols & AITT_TYPE_TCP) == AITT_TYPE_TCP)
170 modules.Get(AITT_TYPE_TCP).Publish(topic, data, datalen, qos, retain);
172 if ((protocols & AITT_TYPE_TCP_SECURE) == AITT_TYPE_TCP_SECURE)
173 modules.Get(AITT_TYPE_TCP_SECURE).Publish(topic, data, datalen, qos, retain);
176 AittSubscribeID AITT::Impl::Subscribe(const std::string &topic, const AITT::SubscribeCallback &cb,
177 void *user_data, AittProtocol protocol, AittQoS qos)
179 SubscribeInfo *info = new SubscribeInfo();
180 info->first = protocol;
182 void *subscribe_handle;
185 subscribe_handle = SubscribeMQ(info, &main_loop, topic, cb, user_data, qos);
188 case AITT_TYPE_TCP_SECURE:
189 subscribe_handle = SubscribeTCP(info, topic, cb, user_data, qos);
192 ERR("Unknown AittProtocol(%d)", protocol);
194 throw std::runtime_error("Unknown AittProtocol");
196 info->second = subscribe_handle;
198 std::unique_lock<std::mutex> lock(subscribed_list_mutex_);
199 subscribed_list.push_back(info);
202 INFO("Subscribe topic(%s) : %p", topic.c_str(), info);
203 return reinterpret_cast<AittSubscribeID>(info);
206 AittSubscribeID AITT::Impl::SubscribeMQ(SubscribeInfo *handle, MainLoopHandler *loop_handle,
207 const std::string &topic, const SubscribeCallback &cb, void *user_data, AittQoS qos)
209 return mq->Subscribe(
211 [this, handle, loop_handle, cb](MSG *msg, const std::string &topic, const void *data,
212 const int datalen, void *mq_user_data) {
213 void *delivery = malloc(datalen);
215 memcpy(delivery, data, datalen);
219 std::bind(&Impl::DetachedCB, this, cb, *msg, delivery, datalen, mq_user_data,
220 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
221 MainLoopHandler::AddIdle(loop_handle, idler_cb, nullptr);
226 int AITT::Impl::DetachedCB(SubscribeCallback cb, MSG msg, void *data, const int datalen,
227 void *user_data, MainLoopHandler::MainLoopResult result, int fd,
228 MainLoopHandler::MainLoopData *loop_data)
230 RETV_IF(cb == nullptr, AITT_LOOP_EVENT_REMOVE);
232 cb(&msg, data, datalen, user_data);
235 return AITT_LOOP_EVENT_REMOVE;
238 void *AITT::Impl::Unsubscribe(AittSubscribeID subscribe_id)
240 INFO("subscribe_id : %p", subscribe_id);
241 SubscribeInfo *info = reinterpret_cast<SubscribeInfo *>(subscribe_id);
243 std::unique_lock<std::mutex> lock(subscribed_list_mutex_);
245 auto it = std::find(subscribed_list.begin(), subscribed_list.end(), info);
246 if (it == subscribed_list.end()) {
247 ERR("Unknown subscribe_id(%p)", subscribe_id);
248 throw AittException(AittException::NO_DATA_ERR);
251 void *user_data = nullptr;
252 SubscribeInfo *found_info = *it;
253 switch (found_info->first) {
255 user_data = mq->Unsubscribe(found_info->second);
258 case AITT_TYPE_TCP_SECURE:
259 user_data = modules.Get(found_info->first).Unsubscribe(found_info->second);
263 ERR("Unknown AittProtocol(%d)", found_info->first);
267 subscribed_list.erase(it);
273 int AITT::Impl::PublishWithReply(const std::string &topic, const void *data, const int datalen,
274 AittProtocol protocol, AittQoS qos, bool retain, const SubscribeCallback &cb, void *user_data,
275 const std::string &correlation)
277 std::string replyTopic = topic + RESPONSE_POSTFIX + std::to_string(reply_id++);
279 if (protocol != AITT_TYPE_MQTT)
280 return -1; // not yet support
284 [this, cb](MSG *sub_msg, const void *sub_data, const int sub_datalen, void *sub_cbdata) {
285 if (sub_msg->IsEndSequence()) {
287 Unsubscribe(sub_msg->GetID());
288 } catch (AittException &e) {
289 ERR("Unsubscribe() Fail(%s)", e.what());
292 cb(sub_msg, sub_data, sub_datalen, sub_cbdata);
294 user_data, protocol, qos);
296 mq->PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
300 int AITT::Impl::PublishWithReplySync(const std::string &topic, const void *data, const int datalen,
301 AittProtocol protocol, AittQoS qos, bool retain, const SubscribeCallback &cb, void *user_data,
302 const std::string &correlation, int timeout_ms)
304 std::string replyTopic = topic + RESPONSE_POSTFIX + std::to_string(reply_id++);
306 if (protocol != AITT_TYPE_MQTT)
307 return -1; // not yet support
309 SubscribeInfo *info = new SubscribeInfo();
310 info->first = protocol;
312 void *subscribe_handle;
313 MainLoopHandler sync_loop;
314 unsigned int timeout_id = 0;
315 bool is_timeout = false;
317 subscribe_handle = SubscribeMQ(
318 info, &sync_loop, replyTopic,
319 [&](MSG *sub_msg, const void *sub_data, const int sub_datalen, void *sub_cbdata) {
320 if (sub_msg->IsEndSequence()) {
322 Unsubscribe(sub_msg->GetID());
323 } catch (AittException &e) {
324 ERR("Unsubscribe() Fail(%s)", e.what());
329 sync_loop.RemoveTimeout(timeout_id);
330 HandleTimeout(timeout_ms, timeout_id, sync_loop, is_timeout);
333 cb(sub_msg, sub_data, sub_datalen, sub_cbdata);
336 info->second = subscribe_handle;
338 std::unique_lock<std::mutex> lock(subscribed_list_mutex_);
339 subscribed_list.push_back(info);
342 mq->PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
344 HandleTimeout(timeout_ms, timeout_id, sync_loop, is_timeout);
349 return AITT_ERROR_TIMED_OUT;
353 void AITT::Impl::HandleTimeout(int timeout_ms, unsigned int &timeout_id,
354 aitt::MainLoopHandler &sync_loop, bool &is_timeout)
356 timeout_id = sync_loop.AddTimeout(
358 [&, timeout_ms](MainLoopHandler::MainLoopResult result, int fd,
359 MainLoopHandler::MainLoopData *data) -> int {
360 ERR("PublishWithReplySync() timeout(%d)", timeout_ms);
363 return AITT_LOOP_EVENT_REMOVE;
368 void AITT::Impl::SendReply(MSG *msg, const void *data, const int datalen, bool end)
370 RET_IF(msg == nullptr);
372 if ((msg->GetProtocols() & AITT_TYPE_MQTT) != AITT_TYPE_MQTT)
373 return; // not yet support
375 if (end == false || msg->GetSequence())
376 msg->IncreaseSequence();
377 msg->SetEndSequence(end);
379 mq->SendReply(msg, data, datalen, AITT_QOS_AT_MOST_ONCE, false);
382 void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic,
383 const SubscribeCallback &cb, void *user_data, AittQoS qos)
385 return modules.Get(handle->first)
388 [handle, cb](const std::string &topic, const void *data, const int datalen,
389 void *user_data, const std::string &correlation) -> void {
393 msg.SetCorrelation(correlation);
394 msg.SetProtocols(handle->first);
396 return cb(&msg, data, datalen, user_data);
401 AittStream *AITT::Impl::CreateStream(AittStreamProtocol type, const std::string &topic,
404 AittStreamModule *stream = nullptr;
406 stream = modules.NewStreamModule(type, topic, role);
407 in_use_streams.push_back(stream);
408 } catch (std::exception &e) {
409 ERR("StreamHandler() Fail(%s)", e.what());
416 void AITT::Impl::DestroyStream(AittStream *aitt_stream)
418 auto it = std::find(in_use_streams.begin(), in_use_streams.end(), aitt_stream);
419 if (it == in_use_streams.end()) {
420 ERR("Unknown Stream(%p)", aitt_stream);
423 in_use_streams.erase(it);