static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
}
+void AittDiscovery::Restart()
+{
+ RET_IF(callback_handle);
+ discovery_mq->Unsubscribe(callback_handle);
+ callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
+ static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
+}
+
void AittDiscovery::Stop()
{
discovery_mq->Unsubscribe(callback_handle);
discovery_mq->Disconnect();
}
-void AittDiscovery::UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, size_t length)
+void AittDiscovery::UpdateDiscoveryMsg(const std::string &protocol, const void *msg, size_t length)
{
auto it = discovery_map.find(protocol);
if (it == discovery_map.end())
PublishDiscoveryMsg();
}
-int AittDiscovery::AddDiscoveryCB(AittProtocol protocol, const DiscoveryCallback &cb)
+int AittDiscovery::AddDiscoveryCB(const std::string &protocol, const DiscoveryCallback &cb)
{
static std::atomic_int id(0);
id++;
if (msg == nullptr) {
for (const auto &node : discovery->callbacks) {
- std::pair<AittProtocol, DiscoveryCallback> cb_info = node.second;
+ std::pair<std::string, DiscoveryCallback> cb_info = node.second;
cb_info.second(clientId, WILL_LEAVE_NETWORK, nullptr, 0);
}
return;
auto blob = map[key].AsBlob();
for (const auto &node : discovery->callbacks) {
- std::pair<AittProtocol, DiscoveryCallback> cb_info = node.second;
- if (cb_info.first == discovery->GetProtocol(key)) {
+ std::pair<std::string, DiscoveryCallback> cb_info = node.second;
+ if (cb_info.first == key) {
cb_info.second(clientId, status, blob.data(), blob.size());
}
}
fbb.Map([this, &fbb]() {
fbb.String("status", JOIN_NETWORK);
- for (const std::pair<const AittProtocol, const DiscoveryBlob &> &node : discovery_map) {
- fbb.Key(GetProtocolStr(node.first));
+ for (const std::pair<const std::string &, const DiscoveryBlob &> &node : discovery_map) {
+ fbb.Key(node.first);
fbb.Blob(node.second.data.get(), node.second.len);
}
});
true);
}
-const char *AittDiscovery::GetProtocolStr(AittProtocol protocol)
-{
- switch (protocol) {
- case AITT_TYPE_MQTT:
- return "mqtt";
- case AITT_TYPE_TCP:
- return "tcp";
- case AITT_TYPE_TCP_SECURE:
- return "tcp_secure";
- case AITT_TYPE_WEBRTC:
- return "webrtc";
- default:
- ERR("Unknown protocol(%d)", protocol);
- }
-
- return nullptr;
-}
-
-AittProtocol AittDiscovery::GetProtocol(const std::string &protocol_str)
-{
- if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_MQTT)))
- return AITT_TYPE_MQTT;
-
- if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_TCP)))
- return AITT_TYPE_TCP;
-
- if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_TCP_SECURE)))
- return AITT_TYPE_TCP_SECURE;
-
- if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_WEBRTC)))
- return AITT_TYPE_WEBRTC;
-
- return AITT_TYPE_UNKNOWN;
-}
-
AittDiscovery::DiscoveryBlob::DiscoveryBlob(const void *msg, size_t length)
: len(length), data(new char[len])
{
void SetMQ(std::unique_ptr<MQ> mq);
void Start(const std::string &host, int port, const std::string &username,
const std::string &password);
+ void Restart();
void Stop();
- void UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, size_t length);
- int AddDiscoveryCB(AittProtocol protocol, const DiscoveryCallback &cb);
+ void UpdateDiscoveryMsg(const std::string &protocol, const void *msg, size_t length);
+ int AddDiscoveryCB(const std::string &protocol, const DiscoveryCallback &cb);
void RemoveDiscoveryCB(int callback_id);
bool CompareTopic(const std::string &left, const std::string &right);
static void DiscoveryMessageCallback(MSG *mq, const std::string &topic, const void *msg,
const int szmsg, void *user_data);
void PublishDiscoveryMsg();
- const char *GetProtocolStr(AittProtocol protocol);
- AittProtocol GetProtocol(const std::string &protocol_str);
std::string id_;
std::unique_ptr<MQ> discovery_mq;
void *callback_handle;
- std::map<AittProtocol, DiscoveryBlob> discovery_map;
- std::map<int, std::pair<AittProtocol, DiscoveryCallback>> callbacks;
+ std::map<std::string, DiscoveryBlob> discovery_map;
+ std::map<int, std::pair<std::string, DiscoveryCallback>> callbacks;
};
// Discovery Message (flexbuffers)
/*
- * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*/
#pragma once
+#include <AittDiscovery.h>
+#include <AittStream.h>
#include <AittTypes.h>
+#include <functional>
#include <string>
+
+#define AITT_STREAM_NEW aitt_stream_new
+#define TO_STR(s) #s
+#define DEFINE_TO_STR(x) TO_STR(x)
+
namespace aitt {
-class StreamTag {
+class AittStreamModule : public AittStream {
public:
- explicit StreamTag(const std::string &topic, AittProtocol protocol, AittStreamRole role,
- void *handle)
- : topic_(topic), protocol_(protocol), role_(role), handle_(handle){};
- std::string topic_;
- AittProtocol protocol_;
- AittStreamRole role_;
- void *handle_;
+ typedef void *(
+ *ModuleEntry)(AittDiscovery &discovery, const std::string &topic, AittStreamRole role);
+
+ static constexpr const char *const MODULE_ENTRY_NAME = DEFINE_TO_STR(AITT_STREAM_NEW);
+
+ AittStreamModule() = default;
+ virtual ~AittStreamModule(void) = default;
};
} // namespace aitt
+
+#undef TO_STR
+#undef DEFINE_TO_STR
#include <AittException.h>
#include <AittOption.h>
+#include <AittStream.h>
#include <AittTypes.h>
#include <MSG.h>
std::function<void(MSG *msg, const void *data, const size_t datalen, void *user_data)>;
using ConnectionCallback = std::function<void(AITT &, int, void *user_data)>;
- using StreamStateCallback =
- std::function<void(AittStreamState state, void *user_data)>;
- using StreamSinkCallback = std::function<void(const void *data,
- const size_t datalen, void *user_data)>;
-
explicit AITT(const std::string &id, const std::string &ip_addr,
AittOption option = AittOption(false, false));
virtual ~AITT(void);
void SendReply(MSG *msg, const void *data, const size_t datalen, bool end = true);
- AittStreamID CreatePublishStream(const std::string &topic, AittProtocol protocol);
- AittStreamID CreateSubscribeStream(const std::string &topic, AittProtocol protocol);
- void DestroyStream(AittStreamID handle);
- void SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value);
- std::string GetStreamConfig(AittStreamID handle, const std::string &key);
- void StartStream(AittStreamID handle);
- void StopStream(AittStreamID handle);
- void SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb,
- void *user_data = nullptr);
- void UnsetStreamStateCallback(AittStreamID handle);
- void SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb,
- void *user_data = nullptr);
- void UnsetStreamSinkCallback(AittStreamID handle);
+ AittStream *CreateStream(AittStreamProtocol type, const std::string &topic,
+ AittStreamRole role);
+ void DestroyStream(AittStream *aitt_stream);
private:
class Impl;
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <AittTypes.h>
+
+#include <functional>
+#include <memory>
+#include <string>
+
+namespace aitt {
+
+class AittStream {
+ public:
+ using StateCallback =
+ std::function<void(AittStream *stream, AittStreamState state, void *user_data)>;
+ using ReceiveCallback = std::function<void(AittStream *stream, void *data, void *user_data)>;
+
+ AittStream() = default;
+ virtual ~AittStream() = default;
+
+ virtual void SetConfig(const std::string &key, const std::string &value) = 0;
+ virtual void SetConfig(const std::string &key, void *obj) = 0;
+ virtual void Start() = 0;
+ virtual void SetStateCallback(StateCallback cb, void *user_data) = 0;
+
+ // Subscriber ONLY
+ virtual void SetReceiveCallback(ReceiveCallback cb, void *user_data) = 0;
+ // virtual void SetMediaInfoCallback(???);
+};
+} // namespace aitt
#define API __attribute__((visibility("default")))
typedef void* AittSubscribeID;
-typedef void* AittStreamID;
enum AittProtocol {
AITT_TYPE_UNKNOWN = 0,
AITT_TYPE_MQTT = (0x1 << 0), // Publish message through the MQTT
AITT_TYPE_TCP = (0x1 << 1), // Publish message to peers using the TCP
AITT_TYPE_TCP_SECURE = (0x1 << 2), // Publish message to peers using the Secure TCP
- AITT_TYPE_WEBRTC = (0x1 << 3), // Publish message to peers using the WEBRTC
+};
+
+enum AittStreamProtocol {
+ AITT_STREAM_TYPE_WEBRTC, // Publish message to peers using the WEBRTC
+ AITT_STREAM_TYPE_RTSP, // Publish message to peers using the RTSP
+ AITT_STREAM_TYPE_MAX
+};
+
+enum AittStreamState {
+ AITT_STREAM_STATE_INIT = 0,
+ AITT_STREAM_STATE_READY = 1,
+ AITT_STREAM_STATE_PLAYING = 2,
+};
+
+enum AittStreamRole {
+ AITT_STREAM_ROLE_PUBLISHER = 0, // Role of source media
+ AITT_STREAM_ROLE_SUBSCRIBER = 1, // Role of destination(receiver)
};
// AittQoS only works with the AITT_TYPE_MQTT
AITT_CONNECT_FAILED = 2, // Failed to connect to the mqtt broker.
};
-enum AittStreamState {
- AITT_STREAM_STATE_INIT = 0,
- AITT_STREAM_STATE_READY = 1,
- AITT_STREAM_STATE_PLAYING = 2,
-};
-
-enum AittStreamRole {
- AITT_STREAM_ROLE_NONE = 0,
- AITT_STREAM_ROLE_SRC = 1,
- AITT_STREAM_ROLE_SINK = 2,
-};
-
// The maximum size in bytes of a message. It follows MQTT
#define AITT_MESSAGE_MAX 268435455
#include <errno.h>
#define TIZEN_ERROR_NONE 0
-#define TIZEN_ERROR_INVALID_PARAMETER -EINVAL
-#define TIZEN_ERROR_PERMISSION_DENIED -EACCES
#define TIZEN_ERROR_OUT_OF_MEMORY -ENOMEM
+#define TIZEN_ERROR_PERMISSION_DENIED -EACCES
#define TIZEN_ERROR_RESOURCE_BUSY -EBUSY
+#define TIZEN_ERROR_INVALID_PARAMETER -EINVAL
#define TIZEN_ERROR_TIMED_OUT (-1073741824LL + 1)
#define TIZEN_ERROR_NOT_SUPPORTED (-1073741824LL + 2)
#define TIZEN_ERROR_AITT -0x04020000
enum AittError {
AITT_ERROR_NONE = TIZEN_ERROR_NONE, /**< On Success */
- AITT_ERROR_INVALID_PARAMETER = TIZEN_ERROR_INVALID_PARAMETER, /**< Invalid parameter */
- AITT_ERROR_PERMISSION_DENIED = TIZEN_ERROR_PERMISSION_DENIED, /**< Permission denied */
AITT_ERROR_OUT_OF_MEMORY = TIZEN_ERROR_OUT_OF_MEMORY, /**< Out of memory */
- AITT_ERROR_RESOURCE_BUSY = TIZEN_ERROR_RESOURCE_BUSY, /**< Resource Busy */
+ AITT_ERROR_PERMISSION_DENIED = TIZEN_ERROR_PERMISSION_DENIED, /**< Permission denied */
+ AITT_ERROR_RESOURCE_BUSY = TIZEN_ERROR_RESOURCE_BUSY, /**< Device or resource busy */
+ AITT_ERROR_INVALID_PARAMETER = TIZEN_ERROR_INVALID_PARAMETER, /**< Invalid parameter */
AITT_ERROR_TIMED_OUT = TIZEN_ERROR_TIMED_OUT, /**< Time out */
AITT_ERROR_NOT_SUPPORTED = TIZEN_ERROR_NOT_SUPPORTED, /**< Not supported */
AITT_ERROR_UNKNOWN = TIZEN_ERROR_AITT | 0x01, /**< Unknown Error */
AITT_ERROR_SYSTEM = TIZEN_ERROR_AITT | 0x02, /**< System errors */
- AITT_ERROR_NOT_READY = TIZEN_ERROR_AITT | 0x03, /**< System errors */
+ AITT_ERROR_NOT_READY = TIZEN_ERROR_AITT | 0x03, /**< Not available */
};
{
aittThread = std::thread(&Module::ThreadMain, this);
- discovery_cb = discovery.AddDiscoveryCB(type,
+ discovery_cb = discovery.AddDiscoveryCB(NAME[secure],
std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
DBG("Discovery Callback : %p, %d", this, discovery_cb);
fbb.Finish();
auto buf = fbb.GetBuffer();
- discovery.UpdateDiscoveryMsg(secure ? AITT_TYPE_TCP_SECURE : AITT_TYPE_TCP, buf.data(),
- buf.size());
+ discovery.UpdateDiscoveryMsg(NAME[secure], buf.data(), buf.size());
}
void Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
void UpdatePublishTable(const std::string &topic, const std::string &host,
const TCP::ConnectInfo &info);
+ const char *const NAME[2] = {"TCP", "SECURE_TCP"};
MainLoopHandler main_loop;
std::thread aittThread;
int discovery_cb;
if (data_len == UINT32_MAX)
return HandleZeroMsg(data, data_size);
+ if (AITT_MESSAGE_MAX < data_len) {
+ ERR("Invalid Size(%zu)", data_len);
+ return -1;
+ }
void *data_buf = malloc(data_len);
Recv(data_buf, data_len);
data_size = data_len;
return pImpl->SendReply(msg, data, datalen, end);
}
-AittStreamID AITT::CreatePublishStream(const std::string &topic, AittProtocol protocol)
+AittStream *AITT::CreateStream(AittStreamProtocol type, const std::string &topic,
+ AittStreamRole role)
{
- return pImpl->CreatePublishStream(topic, protocol);
+ return pImpl->CreateStream(type, topic, role);
}
-AittStreamID AITT::CreateSubscribeStream(const std::string &topic, AittProtocol protocol)
+void AITT::DestroyStream(AittStream *aitt_stream)
{
- return pImpl->CreateSubscribeStream(topic, protocol);
-}
-
-void AITT::DestroyStream(AittStreamID handle)
-{
- return pImpl->DestroyStream(handle);
-}
-
-void AITT::SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value)
-{
- return pImpl->SetStreamConfig(handle, key, value);
-}
-
-std::string AITT::GetStreamConfig(AittStreamID handle, const std::string &key)
-{
- return pImpl->GetStreamConfig(handle, key);
-}
-
-void AITT::StartStream(AittStreamID handle)
-{
- return pImpl->StartStream(handle);
-}
-
-void AITT::StopStream(AittStreamID handle)
-{
- return pImpl->StopStream(handle);
-}
-
-void AITT::SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb,
- void *user_data)
-{
- return pImpl->SetStreamStateCallback(handle, cb, user_data);
-}
-
-void AITT::UnsetStreamStateCallback(AittStreamID handle)
-{
- return pImpl->UnsetStreamStateCallback(handle);
-}
-
-void AITT::SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb,
- void *user_data)
-{
- return pImpl->SetStreamSinkCallback(handle, cb, user_data);
-}
-
-void AITT::UnsetStreamSinkCallback(AittStreamID handle)
-{
- return pImpl->UnsetStreamSinkCallback(handle);
+ return pImpl->DestroyStream(aitt_stream);
}
} // namespace aitt
*/
#include "AITTImpl.h"
-#include <flatbuffers/flexbuffers.h>
-
+#include <algorithm>
#include <cerrno>
#include <cstring>
#include <functional>
void AITT::Impl::Disconnect(void)
{
UnsubscribeAll();
- DestroyAllStream();
+
+ for (auto stream : in_use_streams)
+ delete stream;
+ in_use_streams.clear();
mqtt_broker_ip_.clear();
mqtt_broker_port_ = -1;
}
subscribed_list.clear();
}
-void AITT::Impl::DestroyAllStream(void)
-{
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
- for (auto tag : stream_list_) {
- if (!tag)
- continue;
- //TODO call stream transport
- delete tag;
- }
- stream_list_.clear();
-}
-
-void AITT::Impl::StreamInfoExist(StreamTag *tag)
-{
- if (std::find(stream_list_.begin(), stream_list_.end(), tag) == stream_list_.end()) {
- ERR("Unknown stream id(%p)", tag);
- throw std::runtime_error("stream id");
- }
-}
void AITT::Impl::ConfigureTransportModule(const std::string &key, const std::string &value,
AittProtocol protocols)
auto it = std::find(subscribed_list.begin(), subscribed_list.end(), info);
if (it == subscribed_list.end()) {
ERR("Unknown subscribe_id(%p)", subscribe_id);
- throw std::runtime_error("subscribe_id");
+ throw AittException(AittException::NO_DATA_ERR);
}
void *user_data = nullptr;
if (sub_msg->IsEndSequence()) {
try {
Unsubscribe(sub_msg->GetID());
- } catch (std::runtime_error &e) {
+ } catch (AittException &e) {
ERR("Unsubscribe() Fail(%s)", e.what());
}
}
if (sub_msg->IsEndSequence()) {
try {
Unsubscribe(sub_msg->GetID());
- } catch (std::runtime_error &e) {
+ } catch (AittException &e) {
ERR("Unsubscribe() Fail(%s)", e.what());
}
sync_loop.Quit();
user_data, qos);
}
-AittStreamID AITT::Impl::CreatePublishStream(const std::string &topic, AittProtocol protocol)
+AittStream *AITT::Impl::CreateStream(AittStreamProtocol type, const std::string &topic,
+ AittStreamRole role)
{
- //TODO call stream transport
- auto info = new StreamTag(topic, protocol, AITT_STREAM_ROLE_SRC, nullptr);
- // TODO: be prepared when there's memory failure
- {
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
- stream_list_.push_back(info);
+ AittStreamModule *stream = nullptr;
+ try {
+ stream = modules.NewStreamModule(type, topic, role);
+ in_use_streams.push_back(stream);
+ } catch (std::exception &e) {
+ ERR("StreamHandler() Fail(%s)", e.what());
}
+ discovery.Restart();
- INFO("Stream topic(%s) : %p", topic.c_str(), info);
- return reinterpret_cast<AittStreamID>(info);
+ return stream;
}
-AittStreamID AITT::Impl::CreateSubscribeStream(const std::string &topic,
- AittProtocol protocol)
+void AITT::Impl::DestroyStream(AittStream *aitt_stream)
{
- //TODO call stream transport
- auto info = new StreamTag(topic, protocol, AITT_STREAM_ROLE_SINK, nullptr);
- // TODO: be prepared when there's memory failure
- {
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
- stream_list_.push_back(info);
+ auto it = std::find(in_use_streams.begin(), in_use_streams.end(), aitt_stream);
+ if (it == in_use_streams.end()) {
+ ERR("Unknown Stream(%p)", aitt_stream);
+ return;
}
-
- INFO("Stream topic(%s) : %p", topic.c_str(), info);
- return reinterpret_cast<AittStreamID>(info);
-}
-
-void AITT::Impl::DestroyStream(AittStreamID handle)
-{
- INFO("stream id : %p", handle);
- StreamTag *info = reinterpret_cast<StreamTag *>(handle);
-
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
- auto it = std::find(stream_list_.begin(), stream_list_.end(), info);
- if (it == stream_list_.end()) {
- ERR("Unknown stream id(%p)", handle);
- throw std::runtime_error("stream ID");
- }
-
- //TODO call stream transport
-
- stream_list_.erase(it);
- delete *it;
-}
-
-void AITT::Impl::SetStreamConfig(AittStreamID handle, const std::string &key,
- const std::string &value)
-{
- INFO("stream id : %p", handle);
- StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
- StreamInfoExist(tag);
-
- //TODO call stream transport
-}
-
-std::string AITT::Impl::GetStreamConfig(AittStreamID handle, const std::string &key)
-{
- std::string value;
- INFO("stream id : %p", handle);
- StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
- StreamInfoExist(tag);
-
- //TODO call stream transport
-
- return value;
-}
-
-void AITT::Impl::StartStream(AittStreamID handle)
-{
- INFO("stream id : %p", handle);
- StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
- StreamInfoExist(tag);
-
- //TODO call stream transport
-}
-
-void AITT::Impl::StopStream(AittStreamID handle)
-{
- INFO("stream id : %p", handle);
- StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
- StreamInfoExist(tag);
-
- //TODO call stream transport
-}
-
-void AITT::Impl::SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb,
- void *user_data)
-{
- INFO("stream id : %p", handle);
- StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
- StreamInfoExist(tag);
-
- //TODO call stream transport
-}
-
-void AITT::Impl::UnsetStreamStateCallback(AittStreamID handle)
-{
- INFO("stream id : %p", handle);
- StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
- StreamInfoExist(tag);
-
- //TODO call stream transport
-}
-
-void AITT::Impl::SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb, void *user_data)
-{
- INFO("stream id : %p", handle);
- StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
- StreamInfoExist(tag);
-
- //TODO call stream transport
-}
-
-void AITT::Impl::UnsetStreamSinkCallback(AittStreamID handle)
-{
- INFO("stream id : %p", handle);
- StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
- std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
- StreamInfoExist(tag);
-
- //TODO call stream transport
+ in_use_streams.erase(it);
+ delete aitt_stream;
}
} // namespace aitt
*/
#pragma once
-#include <flatbuffers/flexbuffers.h>
-
#include <map>
#include <memory>
#include <mutex>
#include "AITT.h"
#include "AittDiscovery.h"
-#include "AittStreamTag.h"
+#include "AittStream.h"
#include "MQ.h"
#include "MainLoopHandler.h"
#include "ModuleManager.h"
namespace aitt {
-
class AITT::Impl {
public:
explicit Impl(AITT &parent, const std::string &id, const std::string &my_ip,
void SendReply(MSG *msg, const void *data, const int datalen, bool end);
- AittStreamID CreatePublishStream(const std::string &topic, AittProtocol protocol);
- AittStreamID CreateSubscribeStream(const std::string &topic, AittProtocol protocol);
- void DestroyStream(AittStreamID handle);
- void SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value);
- std::string GetStreamConfig(AittStreamID handle, const std::string &key);
- void StartStream(AittStreamID handle);
- void StopStream(AittStreamID handle);
- void SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb, void *user_data);
- void UnsetStreamStateCallback(AittStreamID handle);
- void SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb, void *user_data);
- void UnsetStreamSinkCallback(AittStreamID handle);
+ AittStream *CreateStream(AittStreamProtocol type, const std::string &topic,
+ AittStreamRole role);
+ void DestroyStream(AittStream *aitt_stream);
private:
using Blob = std::pair<const void *, int>;
void *SubscribeTCP(SubscribeInfo *, const std::string &topic, const SubscribeCallback &cb,
void *cbdata, AittQoS qos);
- void *SubscribeWebRtc(SubscribeInfo *, const std::string &topic, const SubscribeCallback &cb,
- void *cbdata, AittQoS qos);
void HandleTimeout(int timeout_ms, unsigned int &timeout_id, aitt::MainLoopHandler &sync_loop,
bool &is_timeout);
- void PublishWebRtc(const std::string &topic, const void *data, const size_t datalen,
- AittQoS qos, bool retain);
void UnsubscribeAll();
- void DestroyAllStream(void);
- void StreamInfoExist(StreamTag *tag);
void ThreadMain(void);
AITT &public_api;
std::unique_ptr<MQ> mq;
std::vector<SubscribeInfo *> subscribed_list;
std::mutex subscribed_list_mutex_;
- std::vector<StreamTag *> stream_list_;
- std::mutex stream_list_mutex_;
+ std::vector<AittStreamModule *> in_use_streams;
std::string id_;
std::string mqtt_broker_ip_;
transport_handles.push_back(ModuleHandle(nullptr, nullptr));
LoadTransport(static_cast<TransportType>(i));
}
+
+ for (int i = AITT_STREAM_TYPE_WEBRTC; i < AITT_STREAM_TYPE_MAX; ++i) {
+ stream_handles.push_back(ModuleHandle(nullptr, nullptr));
+ }
}
AittTransport &ModuleManager::Get(AittProtocol protocol)
return TYPE_TCP;
case AITT_TYPE_TCP_SECURE:
return TYPE_TCP_SECURE;
- case AITT_TYPE_WEBRTC:
- return TYPE_WEBRTC;
case AITT_TYPE_MQTT:
default:
return TYPE_TRANSPORT_MAX;
}
-std::string ModuleManager::GetTransportFileName(TransportType type)
+const char *ModuleManager::GetTransportFileName(TransportType type)
{
switch (type) {
case TYPE_TCP:
case TYPE_TCP_SECURE:
return "libaitt-transport-tcp.so";
- case TYPE_WEBRTC:
- return "libaitt-transport-webrtc.so";
default:
ERR("Unknown Type(%d)", type);
break;
}
- return std::string("Unknown");
+ return "Unknown";
+}
+
+const char *ModuleManager::GetStreamFileName(AittStreamProtocol type)
+{
+ switch (type) {
+ case AITT_STREAM_TYPE_WEBRTC:
+ return "libaitt-stream-webrtc.so";
+ case AITT_STREAM_TYPE_RTSP:
+ return "libaitt-stream-rtsp.so";
+ default:
+ ERR("Unknown Type(%d)", type);
+ break;
+ }
+
+ return "Unknown";
}
ModuleManager::ModuleHandle ModuleManager::OpenModule(const char *file)
if (dlclose(handle))
ERR("dlclose: %s", dlerror());
});
- if (handle == nullptr)
+ if (handle == nullptr) {
ERR("dlopen(%s): %s", file, dlerror());
+ throw AittException(AittException::SYSTEM_ERR);
+ }
return handle;
}
if (TYPE_TCP_SECURE == type)
type = TYPE_TCP;
- std::string filename = GetTransportFileName(type);
- ModuleHandle handle = OpenModule(filename.c_str());
+ ModuleHandle handle = OpenModule(GetTransportFileName(type));
return handle;
}
void ModuleManager::LoadTransport(TransportType type)
{
- transport_handles[type] = OpenTransport(type);
- if (transport_handles[type] == nullptr) {
- ERR("OpenTransport(%d) Fail", type);
+ try {
+ transport_handles[type] = OpenTransport(type);
+ } catch (AittException &e) {
+ ERR("OpenTransport(%d) Fail(%s)", type, e.what());
return;
}
}
}
+AittStreamModule *ModuleManager::NewStreamModule(AittStreamProtocol type, const std::string &topic,
+ AittStreamRole role)
+{
+ if (nullptr == stream_handles[type])
+ stream_handles[type] = OpenModule(GetStreamFileName(type));
+
+ AittStreamModule::ModuleEntry get_instance_fn = reinterpret_cast<AittStreamModule::ModuleEntry>(
+ dlsym(stream_handles[type].get(), AittStreamModule::MODULE_ENTRY_NAME));
+ if (get_instance_fn == nullptr) {
+ ERR("dlsym: %s", dlerror());
+ throw AittException(AittException::SYSTEM_ERR);
+ }
+
+ AittStreamModule *instance(
+ static_cast<AittStreamModule *>(get_instance_fn(discovery, topic, role)));
+ if (instance == nullptr) {
+ ERR("get_instance_fn(AittStreamModule) Fail");
+ throw AittException(AittException::SYSTEM_ERR);
+ }
+
+ return instance;
+}
+
std::unique_ptr<MQ> ModuleManager::NewCustomMQ(const std::string &id, const AittOption &option)
{
- custom_mqtt_handle = OpenModule("libaitt-st-broker.so");
+ if (nullptr == custom_mqtt_handle)
+ custom_mqtt_handle = OpenModule("libaitt-st-broker.so");
MQ::ModuleEntry get_instance_fn =
reinterpret_cast<MQ::ModuleEntry>(dlsym(custom_mqtt_handle.get(), MQ::MODULE_ENTRY_NAME));
#include <vector>
#include "AittDiscovery.h"
+#include "AittStreamModule.h"
#include "AittTransport.h"
#include "MQ.h"
#include "NullTransport.h"
virtual ~ModuleManager() = default;
AittTransport &Get(AittProtocol type);
+ AittStreamModule *NewStreamModule(AittStreamProtocol type, const std::string &topic,
+ AittStreamRole role);
std::unique_ptr<MQ> NewCustomMQ(const std::string &id, const AittOption &option);
private:
enum TransportType {
TYPE_TCP, //(0x1 << 1)
TYPE_TCP_SECURE, //(0x1 << 2)
- TYPE_WEBRTC, //(0x1 << 3)
TYPE_TRANSPORT_MAX,
};
TransportType Convert(AittProtocol type);
- std::string GetTransportFileName(TransportType type);
+ const char *GetTransportFileName(TransportType type);
+ const char *GetStreamFileName(AittStreamProtocol type);
ModuleHandle OpenModule(const char *file);
ModuleHandle OpenTransport(TransportType type);
void LoadTransport(TransportType type);
AittDiscovery &discovery;
std::vector<ModuleHandle> transport_handles;
std::unique_ptr<AittTransport> transports[TYPE_TRANSPORT_MAX];
+ std::vector<ModuleHandle> stream_handles;
ModuleHandle custom_mqtt_handle;
NullTransport null_transport;
};
+++ /dev/null
-/*
- * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <gtest/gtest.h>
-#include <sys/random.h>
-
-#include <thread>
-
-#include "AITT.h"
-#include "AITT_Stream_tests.h"
-
-using AITT = aitt::AITT;
-
-class AITTStreamTest : public testing::Test, public AittStreamTests {
- protected:
- void SetUp() override { Init(); }
- void TearDown() override { Deinit(); }
-};
-
-TEST_F(AITTStreamTest, Positive_Create_Pubilsh_Stream_WebRTC_Anytime)
-{
- try {
- AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
- aitt.Connect();
- publish_stream_handle_ =
- aitt.CreatePublishStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
- EXPECT_NE(publish_stream_handle_, nullptr);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
-}
-
-TEST_F(AITTStreamTest, Positive_Create_Subscribe_Stream_WebRTC_Anytime)
-{
- try {
- AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
- aitt.Connect();
- subscribe_stream_handle_ =
- aitt.CreateSubscribeStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
- EXPECT_NE(subscribe_stream_handle_, nullptr);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
-}
-
-TEST_F(AITTStreamTest, Positive_Destroy_Pubilsh_Stream_WebRTC_Anytime)
-{
- try {
- AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
- aitt.Connect();
- publish_stream_handle_ =
- aitt.CreatePublishStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
- aitt.DestroyStream(publish_stream_handle_);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
-}
-
-TEST_F(AITTStreamTest, Positive_Destroy_Subscribe_Stream_WebRTC_Anytime)
-{
- try {
- AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
- aitt.Connect();
- subscribe_stream_handle_ =
- aitt.CreateSubscribeStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
- aitt.DestroyStream(subscribe_stream_handle_);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
-}
-
-TEST_F(AITTStreamTest, Negative_Destroy_Stream_Anytime)
-{
- EXPECT_THROW(
- {
- try {
- AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
- aitt.Connect();
- aitt.DestroyStream(nullptr);
- } catch (const std::runtime_error &e) {
- // and this tests that it has the correct message
- throw;
- }
- },
- std::runtime_error);
-}
+++ /dev/null
-/*
- * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <glib.h>
-#include <sys/time.h>
-
-#include "aitt_internal.h"
-
-#define LOCAL_IP "127.0.0.1"
-#define TEST_STREAM_TOPIC "test/stream_topic"
-#define TEST_CONFIG_KEY "test_config_key"
-#define TEST_CONFIG_VALUE "test_config_value"
-
-class AittStreamTests {
- public:
- void Init()
- {
- timeval tv;
- char buffer[256];
- gettimeofday(&tv, nullptr);
- snprintf(buffer, sizeof(buffer), "UniqueStreamSrcID.%lX%lX", tv.tv_sec, tv.tv_usec);
- stream_src_id_ = buffer;
- snprintf(buffer, sizeof(buffer), "UniqueStreamSinkID.%lX%lX", tv.tv_sec, tv.tv_usec);
- stream_sink_id_ = buffer;
- stream_topic_ = "StreamTopic";
- mainLoop_ = g_main_loop_new(nullptr, FALSE);
- }
-
- void Deinit() { g_main_loop_unref(mainLoop_); }
-
- void IterateEventLoop(void)
- {
- g_main_loop_run(mainLoop_);
- DBG("Go forward");
- }
-
- AittStreamID publish_stream_handle_;
- AittStreamID subscribe_stream_handle_;
-
- std::string stream_src_id_;
- std::string stream_sink_id_;
- std::string stream_topic_;
- GMainLoop *mainLoop_;
-};
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AittStream.h"
+
+#include <gtest/gtest.h>
+
+#include "AITT.h"
+#include "AittTests.h"
+
+using namespace aitt;
+
+TEST(AittStreamTest, Full_P)
+{
+ try {
+ AITT aitt("streamClientId", LOCAL_IP, AittOption(true, false));
+
+ aitt.Connect();
+
+ AittStream *publisher =
+ aitt.CreateStream(AITT_STREAM_TYPE_WEBRTC, "topic", AITT_STREAM_ROLE_PUBLISHER);
+ ASSERT_TRUE(publisher) << "CreateStream() Fail";
+
+ AittStream *subscriber =
+ aitt.CreateStream(AITT_STREAM_TYPE_WEBRTC, "topic", AITT_STREAM_ROLE_SUBSCRIBER);
+ ASSERT_TRUE(subscriber) << "CreateStream() Fail";
+
+ publisher->SetConfig("key", "value");
+ publisher->Start();
+
+ subscriber->SetConfig("key", "value");
+ subscriber->SetStateCallback([](AittStream *stream, int state, void *user_data) {},
+ (void *)"user_data");
+ subscriber->SetReceiveCallback([](AittStream *stream, void *obj, void *user_data) {},
+ (void *)"user-data");
+ subscriber->Start();
+
+ aitt.DestroyStream(publisher);
+ aitt.DestroyStream(subscriber);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+}
LINK_DIRECTORIES(${UT_NEEDS_LIBRARY_DIRS})
###########################################################################
-SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc AITT_TCP_test.cc)
+SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc
+ AITT_TCP_test.cc AittStream_test.cc)
ADD_EXECUTABLE(${AITT_UT} ${AITT_UT_SRC})
TARGET_LINK_LIBRARIES(${AITT_UT} Threads::Threads ${UT_NEEDS_LIBRARIES} ${PROJECT_NAME})
LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH}
${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT}_local --gtest_filter=*_Anytime
)
-###########################################################################
-ADD_EXECUTABLE(${AITT_UT}_stream AITT_Stream_test.cc)
-TARGET_LINK_LIBRARIES(${AITT_UT}_stream Threads::Threads ${UT_NEEDS_LIBRARIES} ${PROJECT_NAME})
-
-INSTALL(TARGETS ${AITT_UT}_stream DESTINATION ${AITT_TEST_BINDIR})
-
-ADD_TEST(
- NAME
- ${AITT_UT}_stream
- COMMAND
- ${CMAKE_COMMAND} -E env
- LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH}
- ${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT}_stream --gtest_filter=*_Anytime
-)