From 5ed7a0b62368853a509b6aa2029a333d1091695d Mon Sep 17 00:00:00 2001 From: Youngjae Shin Date: Fri, 30 Sep 2022 07:56:37 +0900 Subject: [PATCH] propose revised stream API --- common/AittDiscovery.cc | 57 ++------ common/AittDiscovery.h | 11 +- common/{AittStreamTag.h => AittStreamModule.h} | 29 ++-- include/AITT.h | 22 +-- include/AittStream.h | 44 ++++++ include/AittTypes.h | 43 +++--- modules/tcp/Module.cc | 5 +- modules/tcp/Module.h | 1 + modules/tcp/TCP.cc | 4 + src/AITT.cc | 56 +------- src/AITTImpl.cc | 191 ++++--------------------- src/AITTImpl.h | 28 +--- src/ModuleManager.cc | 67 +++++++-- src/ModuleManager.h | 8 +- tests/AITT_Stream_test.cc | 98 ------------- tests/AITT_Stream_tests.h | 58 -------- tests/AittStream_test.cc | 55 +++++++ tests/CMakeLists.txt | 17 +-- 18 files changed, 270 insertions(+), 524 deletions(-) rename common/{AittStreamTag.h => AittStreamModule.h} (52%) create mode 100644 include/AittStream.h delete mode 100644 tests/AITT_Stream_test.cc delete mode 100644 tests/AITT_Stream_tests.h create mode 100644 tests/AittStream_test.cc diff --git a/common/AittDiscovery.cc b/common/AittDiscovery.cc index 4c6ed42..0b65af9 100644 --- a/common/AittDiscovery.cc +++ b/common/AittDiscovery.cc @@ -45,6 +45,14 @@ void AittDiscovery::Start(const std::string &host, int port, const std::string & static_cast(this), AITT_QOS_EXACTLY_ONCE); } +void AittDiscovery::Restart() +{ + RET_IF(callback_handle); + discovery_mq->Unsubscribe(callback_handle); + callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback, + static_cast(this), AITT_QOS_EXACTLY_ONCE); +} + void AittDiscovery::Stop() { discovery_mq->Unsubscribe(callback_handle); @@ -54,7 +62,7 @@ void AittDiscovery::Stop() discovery_mq->Disconnect(); } -void AittDiscovery::UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, size_t length) +void AittDiscovery::UpdateDiscoveryMsg(const std::string &protocol, const void *msg, size_t length) { auto it = discovery_map.find(protocol); if (it == discovery_map.end()) @@ -65,7 +73,7 @@ void AittDiscovery::UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, s PublishDiscoveryMsg(); } -int AittDiscovery::AddDiscoveryCB(AittProtocol protocol, const DiscoveryCallback &cb) +int AittDiscovery::AddDiscoveryCB(const std::string &protocol, const DiscoveryCallback &cb) { static std::atomic_int id(0); id++; @@ -105,7 +113,7 @@ void AittDiscovery::DiscoveryMessageCallback(MSG *mq, const std::string &topic, if (msg == nullptr) { for (const auto &node : discovery->callbacks) { - std::pair cb_info = node.second; + std::pair cb_info = node.second; cb_info.second(clientId, WILL_LEAVE_NETWORK, nullptr, 0); } return; @@ -123,8 +131,8 @@ void AittDiscovery::DiscoveryMessageCallback(MSG *mq, const std::string &topic, auto blob = map[key].AsBlob(); for (const auto &node : discovery->callbacks) { - std::pair cb_info = node.second; - if (cb_info.first == discovery->GetProtocol(key)) { + std::pair cb_info = node.second; + if (cb_info.first == key) { cb_info.second(clientId, status, blob.data(), blob.size()); } } @@ -138,8 +146,8 @@ void AittDiscovery::PublishDiscoveryMsg() fbb.Map([this, &fbb]() { fbb.String("status", JOIN_NETWORK); - for (const std::pair &node : discovery_map) { - fbb.Key(GetProtocolStr(node.first)); + for (const std::pair &node : discovery_map) { + fbb.Key(node.first); fbb.Blob(node.second.data.get(), node.second.len); } }); @@ -151,41 +159,6 @@ void AittDiscovery::PublishDiscoveryMsg() true); } -const char *AittDiscovery::GetProtocolStr(AittProtocol protocol) -{ - switch (protocol) { - case AITT_TYPE_MQTT: - return "mqtt"; - case AITT_TYPE_TCP: - return "tcp"; - case AITT_TYPE_TCP_SECURE: - return "tcp_secure"; - case AITT_TYPE_WEBRTC: - return "webrtc"; - default: - ERR("Unknown protocol(%d)", protocol); - } - - return nullptr; -} - -AittProtocol AittDiscovery::GetProtocol(const std::string &protocol_str) -{ - if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_MQTT))) - return AITT_TYPE_MQTT; - - if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_TCP))) - return AITT_TYPE_TCP; - - if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_TCP_SECURE))) - return AITT_TYPE_TCP_SECURE; - - if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_WEBRTC))) - return AITT_TYPE_WEBRTC; - - return AITT_TYPE_UNKNOWN; -} - AittDiscovery::DiscoveryBlob::DiscoveryBlob(const void *msg, size_t length) : len(length), data(new char[len]) { diff --git a/common/AittDiscovery.h b/common/AittDiscovery.h index 6725e6f..29c3302 100644 --- a/common/AittDiscovery.h +++ b/common/AittDiscovery.h @@ -35,9 +35,10 @@ class AittDiscovery { void SetMQ(std::unique_ptr mq); void Start(const std::string &host, int port, const std::string &username, const std::string &password); + void Restart(); void Stop(); - void UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, size_t length); - int AddDiscoveryCB(AittProtocol protocol, const DiscoveryCallback &cb); + void UpdateDiscoveryMsg(const std::string &protocol, const void *msg, size_t length); + int AddDiscoveryCB(const std::string &protocol, const DiscoveryCallback &cb); void RemoveDiscoveryCB(int callback_id); bool CompareTopic(const std::string &left, const std::string &right); @@ -55,14 +56,12 @@ class AittDiscovery { static void DiscoveryMessageCallback(MSG *mq, const std::string &topic, const void *msg, const int szmsg, void *user_data); void PublishDiscoveryMsg(); - const char *GetProtocolStr(AittProtocol protocol); - AittProtocol GetProtocol(const std::string &protocol_str); std::string id_; std::unique_ptr discovery_mq; void *callback_handle; - std::map discovery_map; - std::map> callbacks; + std::map discovery_map; + std::map> callbacks; }; // Discovery Message (flexbuffers) diff --git a/common/AittStreamTag.h b/common/AittStreamModule.h similarity index 52% rename from common/AittStreamTag.h rename to common/AittStreamModule.h index 9c4b7bf..a48c74b 100644 --- a/common/AittStreamTag.h +++ b/common/AittStreamModule.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved + * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,20 +15,31 @@ */ #pragma once +#include +#include #include +#include #include + +#define AITT_STREAM_NEW aitt_stream_new +#define TO_STR(s) #s +#define DEFINE_TO_STR(x) TO_STR(x) + namespace aitt { -class StreamTag { +class AittStreamModule : public AittStream { public: - explicit StreamTag(const std::string &topic, AittProtocol protocol, AittStreamRole role, - void *handle) - : topic_(topic), protocol_(protocol), role_(role), handle_(handle){}; - std::string topic_; - AittProtocol protocol_; - AittStreamRole role_; - void *handle_; + typedef void *( + *ModuleEntry)(AittDiscovery &discovery, const std::string &topic, AittStreamRole role); + + static constexpr const char *const MODULE_ENTRY_NAME = DEFINE_TO_STR(AITT_STREAM_NEW); + + AittStreamModule() = default; + virtual ~AittStreamModule(void) = default; }; } // namespace aitt + +#undef TO_STR +#undef DEFINE_TO_STR diff --git a/include/AITT.h b/include/AITT.h index 140c8a8..eac7556 100644 --- a/include/AITT.h +++ b/include/AITT.h @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -35,11 +36,6 @@ class API AITT { std::function; using ConnectionCallback = std::function; - using StreamStateCallback = - std::function; - using StreamSinkCallback = std::function; - explicit AITT(const std::string &id, const std::string &ip_addr, AittOption option = AittOption(false, false)); virtual ~AITT(void); @@ -69,19 +65,9 @@ class API AITT { void SendReply(MSG *msg, const void *data, const size_t datalen, bool end = true); - AittStreamID CreatePublishStream(const std::string &topic, AittProtocol protocol); - AittStreamID CreateSubscribeStream(const std::string &topic, AittProtocol protocol); - void DestroyStream(AittStreamID handle); - void SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value); - std::string GetStreamConfig(AittStreamID handle, const std::string &key); - void StartStream(AittStreamID handle); - void StopStream(AittStreamID handle); - void SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb, - void *user_data = nullptr); - void UnsetStreamStateCallback(AittStreamID handle); - void SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb, - void *user_data = nullptr); - void UnsetStreamSinkCallback(AittStreamID handle); + AittStream *CreateStream(AittStreamProtocol type, const std::string &topic, + AittStreamRole role); + void DestroyStream(AittStream *aitt_stream); private: class Impl; diff --git a/include/AittStream.h b/include/AittStream.h new file mode 100644 index 0000000..1261a50 --- /dev/null +++ b/include/AittStream.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include +#include +#include + +namespace aitt { + +class AittStream { + public: + using StateCallback = + std::function; + using ReceiveCallback = std::function; + + AittStream() = default; + virtual ~AittStream() = default; + + virtual void SetConfig(const std::string &key, const std::string &value) = 0; + virtual void SetConfig(const std::string &key, void *obj) = 0; + virtual void Start() = 0; + virtual void SetStateCallback(StateCallback cb, void *user_data) = 0; + + // Subscriber ONLY + virtual void SetReceiveCallback(ReceiveCallback cb, void *user_data) = 0; + // virtual void SetMediaInfoCallback(???); +}; +} // namespace aitt diff --git a/include/AittTypes.h b/include/AittTypes.h index 1c959e5..88a1d3e 100644 --- a/include/AittTypes.h +++ b/include/AittTypes.h @@ -18,14 +18,29 @@ #define API __attribute__((visibility("default"))) typedef void* AittSubscribeID; -typedef void* AittStreamID; enum AittProtocol { AITT_TYPE_UNKNOWN = 0, AITT_TYPE_MQTT = (0x1 << 0), // Publish message through the MQTT AITT_TYPE_TCP = (0x1 << 1), // Publish message to peers using the TCP AITT_TYPE_TCP_SECURE = (0x1 << 2), // Publish message to peers using the Secure TCP - AITT_TYPE_WEBRTC = (0x1 << 3), // Publish message to peers using the WEBRTC +}; + +enum AittStreamProtocol { + AITT_STREAM_TYPE_WEBRTC, // Publish message to peers using the WEBRTC + AITT_STREAM_TYPE_RTSP, // Publish message to peers using the RTSP + AITT_STREAM_TYPE_MAX +}; + +enum AittStreamState { + AITT_STREAM_STATE_INIT = 0, + AITT_STREAM_STATE_READY = 1, + AITT_STREAM_STATE_PLAYING = 2, +}; + +enum AittStreamRole { + AITT_STREAM_ROLE_PUBLISHER = 0, // Role of source media + AITT_STREAM_ROLE_SUBSCRIBER = 1, // Role of destination(receiver) }; // AittQoS only works with the AITT_TYPE_MQTT @@ -41,18 +56,6 @@ enum AittConnectionState { AITT_CONNECT_FAILED = 2, // Failed to connect to the mqtt broker. }; -enum AittStreamState { - AITT_STREAM_STATE_INIT = 0, - AITT_STREAM_STATE_READY = 1, - AITT_STREAM_STATE_PLAYING = 2, -}; - -enum AittStreamRole { - AITT_STREAM_ROLE_NONE = 0, - AITT_STREAM_ROLE_SRC = 1, - AITT_STREAM_ROLE_SINK = 2, -}; - // The maximum size in bytes of a message. It follows MQTT #define AITT_MESSAGE_MAX 268435455 @@ -63,10 +66,10 @@ enum AittStreamRole { #include #define TIZEN_ERROR_NONE 0 -#define TIZEN_ERROR_INVALID_PARAMETER -EINVAL -#define TIZEN_ERROR_PERMISSION_DENIED -EACCES #define TIZEN_ERROR_OUT_OF_MEMORY -ENOMEM +#define TIZEN_ERROR_PERMISSION_DENIED -EACCES #define TIZEN_ERROR_RESOURCE_BUSY -EBUSY +#define TIZEN_ERROR_INVALID_PARAMETER -EINVAL #define TIZEN_ERROR_TIMED_OUT (-1073741824LL + 1) #define TIZEN_ERROR_NOT_SUPPORTED (-1073741824LL + 2) #define TIZEN_ERROR_AITT -0x04020000 @@ -74,13 +77,13 @@ enum AittStreamRole { enum AittError { AITT_ERROR_NONE = TIZEN_ERROR_NONE, /**< On Success */ - AITT_ERROR_INVALID_PARAMETER = TIZEN_ERROR_INVALID_PARAMETER, /**< Invalid parameter */ - AITT_ERROR_PERMISSION_DENIED = TIZEN_ERROR_PERMISSION_DENIED, /**< Permission denied */ AITT_ERROR_OUT_OF_MEMORY = TIZEN_ERROR_OUT_OF_MEMORY, /**< Out of memory */ - AITT_ERROR_RESOURCE_BUSY = TIZEN_ERROR_RESOURCE_BUSY, /**< Resource Busy */ + AITT_ERROR_PERMISSION_DENIED = TIZEN_ERROR_PERMISSION_DENIED, /**< Permission denied */ + AITT_ERROR_RESOURCE_BUSY = TIZEN_ERROR_RESOURCE_BUSY, /**< Device or resource busy */ + AITT_ERROR_INVALID_PARAMETER = TIZEN_ERROR_INVALID_PARAMETER, /**< Invalid parameter */ AITT_ERROR_TIMED_OUT = TIZEN_ERROR_TIMED_OUT, /**< Time out */ AITT_ERROR_NOT_SUPPORTED = TIZEN_ERROR_NOT_SUPPORTED, /**< Not supported */ AITT_ERROR_UNKNOWN = TIZEN_ERROR_AITT | 0x01, /**< Unknown Error */ AITT_ERROR_SYSTEM = TIZEN_ERROR_AITT | 0x02, /**< System errors */ - AITT_ERROR_NOT_READY = TIZEN_ERROR_AITT | 0x03, /**< System errors */ + AITT_ERROR_NOT_READY = TIZEN_ERROR_AITT | 0x03, /**< Not available */ }; diff --git a/modules/tcp/Module.cc b/modules/tcp/Module.cc index 81717c5..3246ff7 100644 --- a/modules/tcp/Module.cc +++ b/modules/tcp/Module.cc @@ -29,7 +29,7 @@ Module::Module(AittProtocol type, AittDiscovery &discovery, const std::string &m { aittThread = std::thread(&Module::ThreadMain, this); - discovery_cb = discovery.AddDiscoveryCB(type, + discovery_cb = discovery.AddDiscoveryCB(NAME[secure], std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); DBG("Discovery Callback : %p, %d", this, discovery_cb); @@ -306,8 +306,7 @@ void Module::UpdateDiscoveryMsg() fbb.Finish(); auto buf = fbb.GetBuffer(); - discovery.UpdateDiscoveryMsg(secure ? AITT_TYPE_TCP_SECURE : AITT_TYPE_TCP, buf.data(), - buf.size()); + discovery.UpdateDiscoveryMsg(NAME[secure], buf.data(), buf.size()); } void Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle, diff --git a/modules/tcp/Module.h b/modules/tcp/Module.h index ed748a1..7a53edd 100644 --- a/modules/tcp/Module.h +++ b/modules/tcp/Module.h @@ -122,6 +122,7 @@ class Module : public AittTransport { void UpdatePublishTable(const std::string &topic, const std::string &host, const TCP::ConnectInfo &info); + const char *const NAME[2] = {"TCP", "SECURE_TCP"}; MainLoopHandler main_loop; std::thread aittThread; int discovery_cb; diff --git a/modules/tcp/TCP.cc b/modules/tcp/TCP.cc index 4815e20..d291b39 100644 --- a/modules/tcp/TCP.cc +++ b/modules/tcp/TCP.cc @@ -234,6 +234,10 @@ int TCP::RecvSizedDataNormal(void **data, size_t &data_size) if (data_len == UINT32_MAX) return HandleZeroMsg(data, data_size); + if (AITT_MESSAGE_MAX < data_len) { + ERR("Invalid Size(%zu)", data_len); + return -1; + } void *data_buf = malloc(data_len); Recv(data_buf, data_len); data_size = data_len; diff --git a/src/AITT.cc b/src/AITT.cc index a4625af..36d0437 100644 --- a/src/AITT.cc +++ b/src/AITT.cc @@ -129,61 +129,15 @@ void AITT::SendReply(MSG *msg, const void *data, size_t datalen, bool end) return pImpl->SendReply(msg, data, datalen, end); } -AittStreamID AITT::CreatePublishStream(const std::string &topic, AittProtocol protocol) +AittStream *AITT::CreateStream(AittStreamProtocol type, const std::string &topic, + AittStreamRole role) { - return pImpl->CreatePublishStream(topic, protocol); + return pImpl->CreateStream(type, topic, role); } -AittStreamID AITT::CreateSubscribeStream(const std::string &topic, AittProtocol protocol) +void AITT::DestroyStream(AittStream *aitt_stream) { - return pImpl->CreateSubscribeStream(topic, protocol); -} - -void AITT::DestroyStream(AittStreamID handle) -{ - return pImpl->DestroyStream(handle); -} - -void AITT::SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value) -{ - return pImpl->SetStreamConfig(handle, key, value); -} - -std::string AITT::GetStreamConfig(AittStreamID handle, const std::string &key) -{ - return pImpl->GetStreamConfig(handle, key); -} - -void AITT::StartStream(AittStreamID handle) -{ - return pImpl->StartStream(handle); -} - -void AITT::StopStream(AittStreamID handle) -{ - return pImpl->StopStream(handle); -} - -void AITT::SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb, - void *user_data) -{ - return pImpl->SetStreamStateCallback(handle, cb, user_data); -} - -void AITT::UnsetStreamStateCallback(AittStreamID handle) -{ - return pImpl->UnsetStreamStateCallback(handle); -} - -void AITT::SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb, - void *user_data) -{ - return pImpl->SetStreamSinkCallback(handle, cb, user_data); -} - -void AITT::UnsetStreamSinkCallback(AittStreamID handle) -{ - return pImpl->UnsetStreamSinkCallback(handle); + return pImpl->DestroyStream(aitt_stream); } } // namespace aitt diff --git a/src/AITTImpl.cc b/src/AITTImpl.cc index eaecaf4..821c339 100644 --- a/src/AITTImpl.cc +++ b/src/AITTImpl.cc @@ -15,8 +15,7 @@ */ #include "AITTImpl.h" -#include - +#include #include #include #include @@ -111,7 +110,10 @@ void AITT::Impl::Connect(const std::string &host, int port, const std::string &u void AITT::Impl::Disconnect(void) { UnsubscribeAll(); - DestroyAllStream(); + + for (auto stream : in_use_streams) + delete stream; + in_use_streams.clear(); mqtt_broker_ip_.clear(); mqtt_broker_port_ = -1; @@ -143,26 +145,6 @@ void AITT::Impl::UnsubscribeAll() } subscribed_list.clear(); } -void AITT::Impl::DestroyAllStream(void) -{ - std::unique_lock lock(stream_list_mutex_); - - for (auto tag : stream_list_) { - if (!tag) - continue; - //TODO call stream transport - delete tag; - } - stream_list_.clear(); -} - -void AITT::Impl::StreamInfoExist(StreamTag *tag) -{ - if (std::find(stream_list_.begin(), stream_list_.end(), tag) == stream_list_.end()) { - ERR("Unknown stream id(%p)", tag); - throw std::runtime_error("stream id"); - } -} void AITT::Impl::ConfigureTransportModule(const std::string &key, const std::string &value, AittProtocol protocols) @@ -253,7 +235,7 @@ void *AITT::Impl::Unsubscribe(AittSubscribeID subscribe_id) auto it = std::find(subscribed_list.begin(), subscribed_list.end(), info); if (it == subscribed_list.end()) { ERR("Unknown subscribe_id(%p)", subscribe_id); - throw std::runtime_error("subscribe_id"); + throw AittException(AittException::NO_DATA_ERR); } void *user_data = nullptr; @@ -294,7 +276,7 @@ int AITT::Impl::PublishWithReply(const std::string &topic, const void *data, con if (sub_msg->IsEndSequence()) { try { Unsubscribe(sub_msg->GetID()); - } catch (std::runtime_error &e) { + } catch (AittException &e) { ERR("Unsubscribe() Fail(%s)", e.what()); } } @@ -329,7 +311,7 @@ int AITT::Impl::PublishWithReplySync(const std::string &topic, const void *data, if (sub_msg->IsEndSequence()) { try { Unsubscribe(sub_msg->GetID()); - } catch (std::runtime_error &e) { + } catch (AittException &e) { ERR("Unsubscribe() Fail(%s)", e.what()); } sync_loop.Quit(); @@ -406,153 +388,30 @@ void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic, user_data, qos); } -AittStreamID AITT::Impl::CreatePublishStream(const std::string &topic, AittProtocol protocol) +AittStream *AITT::Impl::CreateStream(AittStreamProtocol type, const std::string &topic, + AittStreamRole role) { - //TODO call stream transport - auto info = new StreamTag(topic, protocol, AITT_STREAM_ROLE_SRC, nullptr); - // TODO: be prepared when there's memory failure - { - std::unique_lock lock(stream_list_mutex_); - stream_list_.push_back(info); + AittStreamModule *stream = nullptr; + try { + stream = modules.NewStreamModule(type, topic, role); + in_use_streams.push_back(stream); + } catch (std::exception &e) { + ERR("StreamHandler() Fail(%s)", e.what()); } + discovery.Restart(); - INFO("Stream topic(%s) : %p", topic.c_str(), info); - return reinterpret_cast(info); + return stream; } -AittStreamID AITT::Impl::CreateSubscribeStream(const std::string &topic, - AittProtocol protocol) +void AITT::Impl::DestroyStream(AittStream *aitt_stream) { - //TODO call stream transport - auto info = new StreamTag(topic, protocol, AITT_STREAM_ROLE_SINK, nullptr); - // TODO: be prepared when there's memory failure - { - std::unique_lock lock(stream_list_mutex_); - stream_list_.push_back(info); + auto it = std::find(in_use_streams.begin(), in_use_streams.end(), aitt_stream); + if (it == in_use_streams.end()) { + ERR("Unknown Stream(%p)", aitt_stream); + return; } - - INFO("Stream topic(%s) : %p", topic.c_str(), info); - return reinterpret_cast(info); -} - -void AITT::Impl::DestroyStream(AittStreamID handle) -{ - INFO("stream id : %p", handle); - StreamTag *info = reinterpret_cast(handle); - - std::unique_lock lock(stream_list_mutex_); - - auto it = std::find(stream_list_.begin(), stream_list_.end(), info); - if (it == stream_list_.end()) { - ERR("Unknown stream id(%p)", handle); - throw std::runtime_error("stream ID"); - } - - //TODO call stream transport - - stream_list_.erase(it); - delete *it; -} - -void AITT::Impl::SetStreamConfig(AittStreamID handle, const std::string &key, - const std::string &value) -{ - INFO("stream id : %p", handle); - StreamTag *tag = reinterpret_cast(handle); - - std::unique_lock lock(stream_list_mutex_); - - StreamInfoExist(tag); - - //TODO call stream transport -} - -std::string AITT::Impl::GetStreamConfig(AittStreamID handle, const std::string &key) -{ - std::string value; - INFO("stream id : %p", handle); - StreamTag *tag = reinterpret_cast(handle); - - std::unique_lock lock(stream_list_mutex_); - - StreamInfoExist(tag); - - //TODO call stream transport - - return value; -} - -void AITT::Impl::StartStream(AittStreamID handle) -{ - INFO("stream id : %p", handle); - StreamTag *tag = reinterpret_cast(handle); - - std::unique_lock lock(stream_list_mutex_); - - StreamInfoExist(tag); - - //TODO call stream transport -} - -void AITT::Impl::StopStream(AittStreamID handle) -{ - INFO("stream id : %p", handle); - StreamTag *tag = reinterpret_cast(handle); - - std::unique_lock lock(stream_list_mutex_); - - StreamInfoExist(tag); - - //TODO call stream transport -} - -void AITT::Impl::SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb, - void *user_data) -{ - INFO("stream id : %p", handle); - StreamTag *tag = reinterpret_cast(handle); - - std::unique_lock lock(stream_list_mutex_); - - StreamInfoExist(tag); - - //TODO call stream transport -} - -void AITT::Impl::UnsetStreamStateCallback(AittStreamID handle) -{ - INFO("stream id : %p", handle); - StreamTag *tag = reinterpret_cast(handle); - - std::unique_lock lock(stream_list_mutex_); - - StreamInfoExist(tag); - - //TODO call stream transport -} - -void AITT::Impl::SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb, void *user_data) -{ - INFO("stream id : %p", handle); - StreamTag *tag = reinterpret_cast(handle); - - std::unique_lock lock(stream_list_mutex_); - - StreamInfoExist(tag); - - //TODO call stream transport -} - -void AITT::Impl::UnsetStreamSinkCallback(AittStreamID handle) -{ - INFO("stream id : %p", handle); - StreamTag *tag = reinterpret_cast(handle); - - std::unique_lock lock(stream_list_mutex_); - - StreamInfoExist(tag); - - //TODO call stream transport + in_use_streams.erase(it); + delete aitt_stream; } } // namespace aitt diff --git a/src/AITTImpl.h b/src/AITTImpl.h index 097e749..7279687 100644 --- a/src/AITTImpl.h +++ b/src/AITTImpl.h @@ -15,8 +15,6 @@ */ #pragma once -#include - #include #include #include @@ -26,13 +24,12 @@ #include "AITT.h" #include "AittDiscovery.h" -#include "AittStreamTag.h" +#include "AittStream.h" #include "MQ.h" #include "MainLoopHandler.h" #include "ModuleManager.h" namespace aitt { - class AITT::Impl { public: explicit Impl(AITT &parent, const std::string &id, const std::string &my_ip, @@ -64,17 +61,9 @@ class AITT::Impl { void SendReply(MSG *msg, const void *data, const int datalen, bool end); - AittStreamID CreatePublishStream(const std::string &topic, AittProtocol protocol); - AittStreamID CreateSubscribeStream(const std::string &topic, AittProtocol protocol); - void DestroyStream(AittStreamID handle); - void SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value); - std::string GetStreamConfig(AittStreamID handle, const std::string &key); - void StartStream(AittStreamID handle); - void StopStream(AittStreamID handle); - void SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb, void *user_data); - void UnsetStreamStateCallback(AittStreamID handle); - void SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb, void *user_data); - void UnsetStreamSinkCallback(AittStreamID handle); + AittStream *CreateStream(AittStreamProtocol type, const std::string &topic, + AittStreamRole role); + void DestroyStream(AittStream *aitt_stream); private: using Blob = std::pair; @@ -89,15 +78,9 @@ class AITT::Impl { void *SubscribeTCP(SubscribeInfo *, const std::string &topic, const SubscribeCallback &cb, void *cbdata, AittQoS qos); - void *SubscribeWebRtc(SubscribeInfo *, const std::string &topic, const SubscribeCallback &cb, - void *cbdata, AittQoS qos); void HandleTimeout(int timeout_ms, unsigned int &timeout_id, aitt::MainLoopHandler &sync_loop, bool &is_timeout); - void PublishWebRtc(const std::string &topic, const void *data, const size_t datalen, - AittQoS qos, bool retain); void UnsubscribeAll(); - void DestroyAllStream(void); - void StreamInfoExist(StreamTag *tag); void ThreadMain(void); AITT &public_api; @@ -108,8 +91,7 @@ class AITT::Impl { std::unique_ptr mq; std::vector subscribed_list; std::mutex subscribed_list_mutex_; - std::vector stream_list_; - std::mutex stream_list_mutex_; + std::vector in_use_streams; std::string id_; std::string mqtt_broker_ip_; diff --git a/src/ModuleManager.cc b/src/ModuleManager.cc index 6950c9a..00acedb 100644 --- a/src/ModuleManager.cc +++ b/src/ModuleManager.cc @@ -30,6 +30,10 @@ ModuleManager::ModuleManager(const std::string &my_ip, AittDiscovery &d) transport_handles.push_back(ModuleHandle(nullptr, nullptr)); LoadTransport(static_cast(i)); } + + for (int i = AITT_STREAM_TYPE_WEBRTC; i < AITT_STREAM_TYPE_MAX; ++i) { + stream_handles.push_back(ModuleHandle(nullptr, nullptr)); + } } AittTransport &ModuleManager::Get(AittProtocol protocol) @@ -49,8 +53,6 @@ ModuleManager::TransportType ModuleManager::Convert(AittProtocol type) return TYPE_TCP; case AITT_TYPE_TCP_SECURE: return TYPE_TCP_SECURE; - case AITT_TYPE_WEBRTC: - return TYPE_WEBRTC; case AITT_TYPE_MQTT: default: @@ -60,20 +62,33 @@ ModuleManager::TransportType ModuleManager::Convert(AittProtocol type) return TYPE_TRANSPORT_MAX; } -std::string ModuleManager::GetTransportFileName(TransportType type) +const char *ModuleManager::GetTransportFileName(TransportType type) { switch (type) { case TYPE_TCP: case TYPE_TCP_SECURE: return "libaitt-transport-tcp.so"; - case TYPE_WEBRTC: - return "libaitt-transport-webrtc.so"; default: ERR("Unknown Type(%d)", type); break; } - return std::string("Unknown"); + return "Unknown"; +} + +const char *ModuleManager::GetStreamFileName(AittStreamProtocol type) +{ + switch (type) { + case AITT_STREAM_TYPE_WEBRTC: + return "libaitt-stream-webrtc.so"; + case AITT_STREAM_TYPE_RTSP: + return "libaitt-stream-rtsp.so"; + default: + ERR("Unknown Type(%d)", type); + break; + } + + return "Unknown"; } ModuleManager::ModuleHandle ModuleManager::OpenModule(const char *file) @@ -82,8 +97,10 @@ ModuleManager::ModuleHandle ModuleManager::OpenModule(const char *file) if (dlclose(handle)) ERR("dlclose: %s", dlerror()); }); - if (handle == nullptr) + if (handle == nullptr) { ERR("dlopen(%s): %s", file, dlerror()); + throw AittException(AittException::SYSTEM_ERR); + } return handle; } @@ -93,17 +110,17 @@ ModuleManager::ModuleHandle ModuleManager::OpenTransport(TransportType type) if (TYPE_TCP_SECURE == type) type = TYPE_TCP; - std::string filename = GetTransportFileName(type); - ModuleHandle handle = OpenModule(filename.c_str()); + ModuleHandle handle = OpenModule(GetTransportFileName(type)); return handle; } void ModuleManager::LoadTransport(TransportType type) { - transport_handles[type] = OpenTransport(type); - if (transport_handles[type] == nullptr) { - ERR("OpenTransport(%d) Fail", type); + try { + transport_handles[type] = OpenTransport(type); + } catch (AittException &e) { + ERR("OpenTransport(%d) Fail(%s)", type, e.what()); return; } @@ -122,9 +139,33 @@ void ModuleManager::LoadTransport(TransportType type) } } +AittStreamModule *ModuleManager::NewStreamModule(AittStreamProtocol type, const std::string &topic, + AittStreamRole role) +{ + if (nullptr == stream_handles[type]) + stream_handles[type] = OpenModule(GetStreamFileName(type)); + + AittStreamModule::ModuleEntry get_instance_fn = reinterpret_cast( + dlsym(stream_handles[type].get(), AittStreamModule::MODULE_ENTRY_NAME)); + if (get_instance_fn == nullptr) { + ERR("dlsym: %s", dlerror()); + throw AittException(AittException::SYSTEM_ERR); + } + + AittStreamModule *instance( + static_cast(get_instance_fn(discovery, topic, role))); + if (instance == nullptr) { + ERR("get_instance_fn(AittStreamModule) Fail"); + throw AittException(AittException::SYSTEM_ERR); + } + + return instance; +} + std::unique_ptr ModuleManager::NewCustomMQ(const std::string &id, const AittOption &option) { - custom_mqtt_handle = OpenModule("libaitt-st-broker.so"); + if (nullptr == custom_mqtt_handle) + custom_mqtt_handle = OpenModule("libaitt-st-broker.so"); MQ::ModuleEntry get_instance_fn = reinterpret_cast(dlsym(custom_mqtt_handle.get(), MQ::MODULE_ENTRY_NAME)); diff --git a/src/ModuleManager.h b/src/ModuleManager.h index d71872d..fbbd2a2 100644 --- a/src/ModuleManager.h +++ b/src/ModuleManager.h @@ -20,6 +20,7 @@ #include #include "AittDiscovery.h" +#include "AittStreamModule.h" #include "AittTransport.h" #include "MQ.h" #include "NullTransport.h" @@ -32,6 +33,8 @@ class ModuleManager { virtual ~ModuleManager() = default; AittTransport &Get(AittProtocol type); + AittStreamModule *NewStreamModule(AittStreamProtocol type, const std::string &topic, + AittStreamRole role); std::unique_ptr NewCustomMQ(const std::string &id, const AittOption &option); private: @@ -41,12 +44,12 @@ class ModuleManager { enum TransportType { TYPE_TCP, //(0x1 << 1) TYPE_TCP_SECURE, //(0x1 << 2) - TYPE_WEBRTC, //(0x1 << 3) TYPE_TRANSPORT_MAX, }; TransportType Convert(AittProtocol type); - std::string GetTransportFileName(TransportType type); + const char *GetTransportFileName(TransportType type); + const char *GetStreamFileName(AittStreamProtocol type); ModuleHandle OpenModule(const char *file); ModuleHandle OpenTransport(TransportType type); void LoadTransport(TransportType type); @@ -55,6 +58,7 @@ class ModuleManager { AittDiscovery &discovery; std::vector transport_handles; std::unique_ptr transports[TYPE_TRANSPORT_MAX]; + std::vector stream_handles; ModuleHandle custom_mqtt_handle; NullTransport null_transport; }; diff --git a/tests/AITT_Stream_test.cc b/tests/AITT_Stream_test.cc deleted file mode 100644 index aa33cc3..0000000 --- a/tests/AITT_Stream_test.cc +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include - -#include - -#include "AITT.h" -#include "AITT_Stream_tests.h" - -using AITT = aitt::AITT; - -class AITTStreamTest : public testing::Test, public AittStreamTests { - protected: - void SetUp() override { Init(); } - void TearDown() override { Deinit(); } -}; - -TEST_F(AITTStreamTest, Positive_Create_Pubilsh_Stream_WebRTC_Anytime) -{ - try { - AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false)); - aitt.Connect(); - publish_stream_handle_ = - aitt.CreatePublishStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC); - EXPECT_NE(publish_stream_handle_, nullptr); - } catch (std::exception &e) { - FAIL() << "Unexpected exception: " << e.what(); - } -} - -TEST_F(AITTStreamTest, Positive_Create_Subscribe_Stream_WebRTC_Anytime) -{ - try { - AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false)); - aitt.Connect(); - subscribe_stream_handle_ = - aitt.CreateSubscribeStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC); - EXPECT_NE(subscribe_stream_handle_, nullptr); - } catch (std::exception &e) { - FAIL() << "Unexpected exception: " << e.what(); - } -} - -TEST_F(AITTStreamTest, Positive_Destroy_Pubilsh_Stream_WebRTC_Anytime) -{ - try { - AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false)); - aitt.Connect(); - publish_stream_handle_ = - aitt.CreatePublishStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC); - aitt.DestroyStream(publish_stream_handle_); - } catch (std::exception &e) { - FAIL() << "Unexpected exception: " << e.what(); - } -} - -TEST_F(AITTStreamTest, Positive_Destroy_Subscribe_Stream_WebRTC_Anytime) -{ - try { - AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false)); - aitt.Connect(); - subscribe_stream_handle_ = - aitt.CreateSubscribeStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC); - aitt.DestroyStream(subscribe_stream_handle_); - } catch (std::exception &e) { - FAIL() << "Unexpected exception: " << e.what(); - } -} - -TEST_F(AITTStreamTest, Negative_Destroy_Stream_Anytime) -{ - EXPECT_THROW( - { - try { - AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false)); - aitt.Connect(); - aitt.DestroyStream(nullptr); - } catch (const std::runtime_error &e) { - // and this tests that it has the correct message - throw; - } - }, - std::runtime_error); -} diff --git a/tests/AITT_Stream_tests.h b/tests/AITT_Stream_tests.h deleted file mode 100644 index 7fc1b7a..0000000 --- a/tests/AITT_Stream_tests.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include -#include - -#include "aitt_internal.h" - -#define LOCAL_IP "127.0.0.1" -#define TEST_STREAM_TOPIC "test/stream_topic" -#define TEST_CONFIG_KEY "test_config_key" -#define TEST_CONFIG_VALUE "test_config_value" - -class AittStreamTests { - public: - void Init() - { - timeval tv; - char buffer[256]; - gettimeofday(&tv, nullptr); - snprintf(buffer, sizeof(buffer), "UniqueStreamSrcID.%lX%lX", tv.tv_sec, tv.tv_usec); - stream_src_id_ = buffer; - snprintf(buffer, sizeof(buffer), "UniqueStreamSinkID.%lX%lX", tv.tv_sec, tv.tv_usec); - stream_sink_id_ = buffer; - stream_topic_ = "StreamTopic"; - mainLoop_ = g_main_loop_new(nullptr, FALSE); - } - - void Deinit() { g_main_loop_unref(mainLoop_); } - - void IterateEventLoop(void) - { - g_main_loop_run(mainLoop_); - DBG("Go forward"); - } - - AittStreamID publish_stream_handle_; - AittStreamID subscribe_stream_handle_; - - std::string stream_src_id_; - std::string stream_sink_id_; - std::string stream_topic_; - GMainLoop *mainLoop_; -}; diff --git a/tests/AittStream_test.cc b/tests/AittStream_test.cc new file mode 100644 index 0000000..e9a22cb --- /dev/null +++ b/tests/AittStream_test.cc @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AittStream.h" + +#include + +#include "AITT.h" +#include "AittTests.h" + +using namespace aitt; + +TEST(AittStreamTest, Full_P) +{ + try { + AITT aitt("streamClientId", LOCAL_IP, AittOption(true, false)); + + aitt.Connect(); + + AittStream *publisher = + aitt.CreateStream(AITT_STREAM_TYPE_WEBRTC, "topic", AITT_STREAM_ROLE_PUBLISHER); + ASSERT_TRUE(publisher) << "CreateStream() Fail"; + + AittStream *subscriber = + aitt.CreateStream(AITT_STREAM_TYPE_WEBRTC, "topic", AITT_STREAM_ROLE_SUBSCRIBER); + ASSERT_TRUE(subscriber) << "CreateStream() Fail"; + + publisher->SetConfig("key", "value"); + publisher->Start(); + + subscriber->SetConfig("key", "value"); + subscriber->SetStateCallback([](AittStream *stream, int state, void *user_data) {}, + (void *)"user_data"); + subscriber->SetReceiveCallback([](AittStream *stream, void *obj, void *user_data) {}, + (void *)"user-data"); + subscriber->Start(); + + aitt.DestroyStream(publisher); + aitt.DestroyStream(subscriber); + } catch (std::exception &e) { + FAIL() << "Unexpected exception: " << e.what(); + } +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1febe7d..4c8dab7 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -7,7 +7,8 @@ INCLUDE_DIRECTORIES(${UT_NEEDS_INCLUDE_DIRS} ../src) LINK_DIRECTORIES(${UT_NEEDS_LIBRARY_DIRS}) ########################################################################### -SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc AITT_TCP_test.cc) +SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc + AITT_TCP_test.cc AittStream_test.cc) ADD_EXECUTABLE(${AITT_UT} ${AITT_UT_SRC}) TARGET_LINK_LIBRARIES(${AITT_UT} Threads::Threads ${UT_NEEDS_LIBRARIES} ${PROJECT_NAME}) @@ -59,17 +60,3 @@ ADD_TEST( LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH} ${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT}_local --gtest_filter=*_Anytime ) -########################################################################### -ADD_EXECUTABLE(${AITT_UT}_stream AITT_Stream_test.cc) -TARGET_LINK_LIBRARIES(${AITT_UT}_stream Threads::Threads ${UT_NEEDS_LIBRARIES} ${PROJECT_NAME}) - -INSTALL(TARGETS ${AITT_UT}_stream DESTINATION ${AITT_TEST_BINDIR}) - -ADD_TEST( - NAME - ${AITT_UT}_stream - COMMAND - ${CMAKE_COMMAND} -E env - LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH} - ${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT}_stream --gtest_filter=*_Anytime -) -- 2.7.4