void AittDiscovery::Stop()
{
- discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
discovery_mq->Unsubscribe(callback_handle);
+ discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
+ discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_AT_MOST_ONCE, true);
callback_handle = nullptr;
discovery_mq->Disconnect();
}
AittDiscovery *discovery = static_cast<AittDiscovery *>(user_data);
- DBG("Called(id = %s, msg = %p:%d)", discovery->id_.c_str(), msg, szmsg);
-
size_t end = topic.find("/", DISCOVERY_TOPIC_BASE.length());
std::string clientId = topic.substr(DISCOVERY_TOPIC_BASE.length(), end);
if (clientId.empty()) {
mosquitto_connect_v5_callback_set(handle, ConnectCallback);
mosquitto_disconnect_v5_callback_set(handle, DisconnectCallback);
- ret = mosquitto_loop_start(handle);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_loop_start() Fail(%s)", mosquitto_strerror(ret));
- break;
- }
-
return;
} while (0);
int ret;
INFO("Destructor");
- ret = mosquitto_loop_stop(handle, true);
- if (ret != MOSQ_ERR_SUCCESS)
- ERR("mosquitto_loop_stop() Fail(%s)", mosquitto_strerror(ret));
-
callback_lock.lock();
connect_cb = nullptr;
subscribers.clear();
}
}
+ ret = mosquitto_loop_start(handle);
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_loop_start() Fail(%s)", mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+
ret = mosquitto_connect(handle, host.c_str(), port, keep_alive);
if (ret != MOSQ_ERR_SUCCESS) {
ERR("mosquitto_connect(%s, %d) Fail(%s)", host.c_str(), port, mosquitto_strerror(ret));
throw AittException(AittException::MQTT_ERR);
}
+ ret = mosquitto_loop_stop(handle, false);
+ if (ret != MOSQ_ERR_SUCCESS)
+ ERR("mosquitto_loop_stop() Fail(%s)", mosquitto_strerror(ret));
+
mosquitto_will_clear(handle);
}
mq->subscriber_iterator = mq->subscribers.begin();
while (mq->subscriber_iterator != mq->subscribers.end()) {
auto subscribe_data = *(mq->subscriber_iterator);
- if (nullptr == subscribe_data)
- ERR("end() is not valid because elements were added.");
+ if (nullptr == subscribe_data) {
+ ERR("Invalid subscribe data");
+ mq->subscriber_iterator++;
+ continue;
+ }
bool result = AittUtil::CompareTopic(subscribe_data->topic.c_str(), msg->topic);
if (result)
void *user_data;
};
- static void ConnectCallback(mosquitto *mosq, void *obj, int rc, int flag,
+ static void ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
const mosquitto_property *props);
- static void DisconnectCallback(mosquitto *mosq, void *obj, int rc,
+ static void DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
const mosquitto_property *props);
static void MessageCallback(mosquitto *, void *, const mosquitto_message *,
const mosquitto_property *);
Module::~Module(void)
{
- discovery.RemoveDiscoveryCB(discovery_cb);
+ try {
+ discovery.RemoveDiscoveryCB(discovery_cb);
+ } catch (std::exception &e) {
+ ERR("RemoveDiscoveryCB() Fail(%s)", e.what());
+ }
while (main_loop.Quit() == false) {
// wait when called before the thread has completely created.
camera_destroy(handle_);
}
+CameraHandler::CameraHandler()
+ : handle_(nullptr), is_started_(false), media_packet_preview_cb_(nullptr), user_data_(nullptr)
+{
+}
+
int CameraHandler::Init(const MediaPacketPreviewCallback &preview_cb, void *user_data)
{
int ret = camera_create(CAMERA_DEVICE_CAMERA0, &handle_);
public:
using MediaPacketPreviewCallback = std::function<void(media_packet_h, void *)>;
+ CameraHandler();
~CameraHandler();
int Init(const MediaPacketPreviewCallback &preview_cb, void *user_data);
void Deinit(void);
#include "aitt_internal.h"
+WebRtcStream::WebRtcStream() : webrtc_handle_(nullptr), is_source_overflow_(false), source_id_(0)
+{
+}
+
WebRtcStream::~WebRtcStream()
{
Destroy();
class WebRtcStream {
public:
+ WebRtcStream();
~WebRtcStream();
bool Create(bool is_source, bool need_display);
void Destroy(void);
{
if (option.GetUseCustomMqttBroker()) {
mq = modules.NewCustomMQ(id, option);
+ AittOption discovery_option = option;
+ discovery_option.SetClearSession(false);
discovery.SetMQ(modules.NewCustomMQ(id + 'd', option));
} else {
mq = std::unique_ptr<MQ>(new MosquittoMQ(id, option.GetClearSession()));
- discovery.SetMQ(std::unique_ptr<MQ>(new MosquittoMQ(id + 'd', option.GetClearSession())));
+ discovery.SetMQ(std::unique_ptr<MQ>(new MosquittoMQ(id + 'd', false)));
}
aittThread = std::thread(&AITT::Impl::ThreadMain, this);
}
AITT::Impl::~Impl(void)
{
- if (false == mqtt_broker_ip_.empty())
- Disconnect();
-
+ if (false == mqtt_broker_ip_.empty()) {
+ try {
+ Disconnect();
+ } catch (std::exception &e) {
+ ERR("Disconnect() Fail(%s)", e.what());
+ }
+ }
while (main_loop.Quit() == false) {
// wait when called before the thread has completely created.
usleep(1000); // 1millisecond
mqtt_broker_ip_.clear();
mqtt_broker_port_ = -1;
- mq->Disconnect();
discovery.Stop();
+ mq->Disconnect();
}
void AITT::Impl::UnsubscribeAll()
std::unique_ptr<MQ> ModuleManager::NewCustomMQ(const std::string &id, const AittOption &option)
{
- ModuleHandle handle = OpenModule("libaitt-st-broker.so");
+ custom_mqtt_handle = OpenModule("libaitt-st-broker.so");
MQ::ModuleEntry get_instance_fn =
- reinterpret_cast<MQ::ModuleEntry>(dlsym(handle.get(), MQ::MODULE_ENTRY_NAME));
+ reinterpret_cast<MQ::ModuleEntry>(dlsym(custom_mqtt_handle.get(), MQ::MODULE_ENTRY_NAME));
if (get_instance_fn == nullptr) {
ERR("dlsym: %s", dlerror());
throw AittException(AittException::SYSTEM_ERR);
TYPE_TCP, //(0x1 << 1)
TYPE_TCP_SECURE, //(0x1 << 2)
TYPE_WEBRTC, //(0x1 << 3)
- TYPE_RTSP,
TYPE_TRANSPORT_MAX,
};