38c6e485c40bf17c9d668679ac4302a061abbe94
[platform/core/ml/aitt.git] / src / MosquittoMQ.h
1 /*
2  * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <mosquitto.h>
19
20 #include <functional>
21 #include <mutex>
22 #include <string>
23 #include <vector>
24
25 #include "MQ.h"
26 #include "MSG.h"
27
28 #define MQTT_LOCALHOST "127.0.0.1"
29 #define MQTT_PORT 1883
30
31 namespace aitt {
32
33 class MosquittoMQ : public MQ {
34   public:
35     explicit MosquittoMQ(const std::string &id, bool clear_session = false);
36     virtual ~MosquittoMQ(void);
37
38     void SetConnectionCallback(const MQConnectionCallback &cb);
39     void Connect(const std::string &host, int port, const std::string &username,
40           const std::string &password);
41     void SetWillInfo(const std::string &topic, const void *msg, int szmsg, int qos, bool retain);
42     void Disconnect(void);
43     void Publish(const std::string &topic, const void *data, const int datalen, int qos = 0,
44           bool retain = false);
45     void PublishWithReply(const std::string &topic, const void *data, const int datalen, int qos,
46           bool retain, const std::string &reply_topic, const std::string &correlation);
47     void SendReply(MSG *msg, const void *data, const int datalen, int qos, bool retain);
48     void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
49           void *user_data = nullptr, int qos = 0);
50     void *Unsubscribe(void *handle);
51     bool CompareTopic(const std::string &left, const std::string &right);
52
53   private:
54     struct SubscribeData {
55         SubscribeData(const std::string &topic, const SubscribeCallback &cb, void *user_data);
56         std::string topic;
57         SubscribeCallback cb;
58         void *user_data;
59     };
60
61     static void ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
62           const mosquitto_property *props);
63     static void DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
64           const mosquitto_property *props);
65     static void MessageCallback(mosquitto *, void *, const mosquitto_message *,
66           const mosquitto_property *);
67     void MessageCB(const mosquitto_message *msg, const mosquitto_property *props);
68     void InvokeCallback(SubscribeData *subscriber, const mosquitto_message *msg,
69           const mosquitto_property *props);
70
71     static const std::string REPLY_SEQUENCE_NUM_KEY;
72     static const std::string REPLY_IS_END_SEQUENCE_KEY;
73
74     mosquitto *handle;
75     const int keep_alive;
76     std::vector<SubscribeData *> subscribers;
77     bool subscribers_iterating;
78     std::vector<SubscribeData *> new_subscribers;
79     std::vector<SubscribeData *>::iterator subscriber_iterator;
80     bool subscriber_iterator_updated;
81     std::recursive_mutex callback_lock;
82     MQConnectionCallback connect_cb;
83 };
84
85 }  // namespace aitt