virtual void SetConfig(const std::string &key, const std::string &value) = 0;
virtual void SetConfig(const std::string &key, void *obj) = 0;
virtual void Start(void) = 0;
+ virtual void Stop(void) = 0;
virtual void SetStateCallback(StateCallback cb, void *user_data) = 0;
// Subscriber ONLY
#include "aitt_internal.h"
+#define RTSP_DBG(fmt, ...) \
+ do { \
+ if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) \
+ DBG("[RTSP_SERVER] " fmt, ##__VA_ARGS__); \
+ else \
+ DBG("[RTSP_CLIENT] " fmt, ##__VA_ARGS__); \
+ } while (0)
+
+#define RTSP_ERR(fmt, ...) \
+ do { \
+ if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) \
+ ERR("[RTSP_SERVER] " fmt, ##__VA_ARGS__); \
+ else \
+ ERR("[RTSP_CLIENT] " fmt, ##__VA_ARGS__); \
+ } while (0)
+
namespace AittRTSPNamespace {
Module::Module(AittDiscovery &discovery, const std::string &topic, AittStreamRole role)
- : discovery_(discovery), topic_(topic), role_(role), info(nullptr), client(nullptr), current_state(0)
+ : discovery_(discovery),
+ topic_(topic),
+ role_(role),
+ server_state(AittStreamState::AITT_STREAM_STATE_INIT),
+ client_state(AittStreamState::AITT_STREAM_STATE_INIT)
{
- DBG("RTSP Module constructor : %s, role : %d", topic_.c_str(), role_);
+ RTSP_DBG("RTSP Module constructor : %s", topic_.c_str());
discovery_cb_ = discovery_.AddDiscoveryCB(topic,
std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1,
Module::~Module(void)
{
- DBG("RTSP Module destroyer : %s, role : %d", topic_.c_str(), role_);
+ RTSP_DBG("RTSP Module destroyer : %s", topic_.c_str());
+
+ discovery_.RemoveDiscoveryCB(discovery_cb_);
}
void Module::SetConfig(const std::string &key, const std::string &value)
{
if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) {
- /* Add rtsp server table */
+ if (key == "url") {
+ if (value.find("rtsp://") != 0) {
+ RTSP_ERR("rtsp url validation check failed");
+ return;
+ }
+
+ info.SetUrl(value);
+ } else if (key == "id") {
+ info.SetID(value);
+ } else if (key == "password") {
+ info.SetPassword(value);
+ }
}
}
void Module::SetConfig(const std::string &key, void *obj)
{
if (role_ == AittStreamRole::AITT_STREAM_ROLE_SUBSCRIBER) {
- /* Set evas object */
+ if (key == "display") {
+ RTSP_DBG("Set Evas object for display");
+ }
}
}
void Module::Start(void)
{
+ RTSP_DBG("Start");
+
+ if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) {
+ UpdateState(role_, AittStreamState::AITT_STREAM_STATE_READY);
+ UpdateDiscoveryMsg();
+ } else {
+ if (server_state == AittStreamState::AITT_STREAM_STATE_READY) {
+ RTSP_DBG("Playing Pipeline in Start() method");
+ client.Start();
+ UpdateState(role_, AittStreamState::AITT_STREAM_STATE_PLAYING);
+ } else {
+ RTSP_DBG("The pipeline not yet created. Wait..");
+ UpdateState(role_, AittStreamState::AITT_STREAM_STATE_READY);
+ }
+ }
+}
+
+void Module::Stop(void)
+{
+ RTSP_DBG("Stop");
+
if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) {
- /* Update Rtsp Server table with retained message of MQTT broker */
+ UpdateState(role_, AittStreamState::AITT_STREAM_STATE_INIT);
+ UpdateDiscoveryMsg();
+ } else {
+ if (client_state == AittStreamState::AITT_STREAM_STATE_PLAYING) {
+ client.Stop();
+ }
+
+ UpdateState(role_, AittStreamState::AITT_STREAM_STATE_INIT);
+ }
+}
+
+void Module::UpdateDiscoveryMsg()
+{
+ if (role_ == AittStreamRole::AITT_STREAM_ROLE_SUBSCRIBER)
+ return;
+
+ flexbuffers::Builder fbb;
+ fbb.Map([this, &fbb]() {
+ fbb.Int(RTSP_INFO_SERVER_STATE, static_cast<int>(server_state));
+ fbb.String(RTSP_INFO_URL, info.GetUrl());
+ fbb.String(RTSP_INFO_ID, info.GetID());
+ fbb.String(RTSP_INFO_PASSWORD, info.GetPassword());
+ });
+ fbb.Finish();
+
+ auto buf = fbb.GetBuffer();
+ discovery_.UpdateDiscoveryMsg(topic_, buf.data(), buf.size());
+}
+
+void Module::UpdateState(AittStreamRole role, AittStreamState state)
+{
+ if (role == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER)
+ server_state = state;
+ else
+ client_state = state;
+
+ if (role == role_ && state_cb.first != nullptr) {
+ state_cb.first(this, state, state_cb.second);
+ }
+}
+
+void Module::DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
+ const void *msg, const int szmsg)
+{
+ if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER)
+ return;
+
+ RTSP_DBG("DiscoveryMessageCallback called");
+
+ if (!status.compare(AittDiscovery::WILL_LEAVE_NETWORK)) {
+ AittStreamState next_state = AittStreamState::AITT_STREAM_STATE_INIT;
+
+ if (client_state == AittStreamState::AITT_STREAM_STATE_PLAYING) {
+ client.Stop();
+ next_state = AittStreamState::AITT_STREAM_STATE_READY;
+ }
+
+ client.DestroyPipeline();
+ UpdateState(role_, next_state);
+ return;
+ }
+
+ auto map = flexbuffers::GetRoot(static_cast<const uint8_t *>(msg), szmsg).AsMap();
+ if (map.size() != 4) {
+ RTSP_ERR("RTSP Info validation check failed");
+ return;
}
- else {
- /* Check if the topic exists in server_table to find */
- /* if exists, then create pipeline using that information */
- /* if not, wait until discovery message from Publisher */
+
+ std::lock_guard<std::mutex> auto_lock(pipeline_lock);
+
+ UpdateState(AittStreamRole::AITT_STREAM_ROLE_PUBLISHER,
+ static_cast<AittStreamState>(map[RTSP_INFO_SERVER_STATE].AsInt64()));
+ info.SetUrl(map[RTSP_INFO_URL].AsString().c_str());
+ info.SetID(map[RTSP_INFO_ID].AsString().c_str());
+ info.SetPassword(map[RTSP_INFO_PASSWORD].AsString().c_str());
+
+ RTSP_DBG("server_state : %d, url : %s, id : %s, passwd : %s", server_state,
+ info.GetUrl().c_str(),
+ info.GetID().c_str(), info.GetPassword().c_str());
+
+ if (server_state == AittStreamState::AITT_STREAM_STATE_READY) {
+ client.CreatePipeline(info.GetCompleteUrl());
+
+ if (client_state == AittStreamState::AITT_STREAM_STATE_READY) {
+ RTSP_DBG("Playing pipeline in DiscoveryMessage Callback");
+
+ client.Start();
+
+ UpdateState(AittStreamRole::AITT_STREAM_ROLE_SUBSCRIBER,
+ AittStreamState::AITT_STREAM_STATE_PLAYING);
+ }
+ } else if (server_state == AittStreamState::AITT_STREAM_STATE_INIT) {
+ if (client_state == AittStreamState::AITT_STREAM_STATE_PLAYING) {
+ RTSP_DBG("Stop pipeline in DiscoveryMessage Callback");
+
+ client.Stop();
+
+ UpdateState(AittStreamRole::AITT_STREAM_ROLE_SUBSCRIBER,
+ AittStreamState::AITT_STREAM_STATE_READY);
+ }
+
+ client.DestroyPipeline();
}
}
if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER)
return;
+ state_cb = std::make_pair(cb, user_data);
}
void Module::SetReceiveCallback(ReceiveCallback cb, void *user_data)
{
if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER)
return;
+
+ receive_cb = std::make_pair(cb, user_data);
}
} // namespace AittRTSPNamespace
void SetConfig(const std::string &key, const std::string &value) override;
void SetConfig(const std::string &key, void *obj) override;
void Start(void) override;
-
+ void Stop(void) override;
void SetStateCallback(StateCallback cb, void *user_data) override;
void SetReceiveCallback(ReceiveCallback cb, void *user_data) override;
private:
+ void UpdateState(AittStreamRole role, AittStreamState state);
void UpdateDiscoveryMsg();
void DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
const void *msg, const int szmsg);
AittStreamRole role_;
int discovery_cb_;
- RTSPInfo *info;
- RTSPClient *client;
- int current_state;
+ RTSPInfo info;
+ RTSPClient client;
+
+ AittStreamState server_state;
+ AittStreamState client_state;
+
+ std::mutex pipeline_lock;
+ std::pair<StateCallback, void *> state_cb;
+ std::pair<ReceiveCallback, void *> receive_cb;
};
} // namespace AittRTSPNamespace
#include "aitt_internal.h"
-RTSPClient::RTSPClient(const std::string &_url)
- : url(_url),
- pipeline(nullptr),
- state_cb_user_data(nullptr),
- data_cb_user_data(nullptr),
- state(0)
+RTSPClient::RTSPClient() : pipeline(nullptr), state(0)
{
- DBG("RTSPClient constructor");
}
RTSPClient::~RTSPClient()
{
- DBG("RTSPClient destructor");
}
void RTSPClient::OnPadAddedCB(GstElement *element, GstPad *pad, gpointer data)
/* need queueing and delete old frame */
std::lock_guard<std::mutex> auto_lock(client->data_cb_lock);
- if (client->data_cb != nullptr)
- client->data_cb(frame, client->data_cb_user_data);
+ if (client->data_cb.first != nullptr)
+ client->data_cb.first(frame, client->data_cb.second);
}
gboolean RTSPClient::MessageReceived(GstBus *bus, GstMessage *message, gpointer data)
return TRUE;
}
-void RTSPClient::CreatePipeline()
+void RTSPClient::CreatePipeline(const std::string &url)
{
+ if (url.empty() == true) {
+ ERR("RTSP Server url is empty");
+ return;
+ }
+
+ if (pipeline != nullptr) {
+ ERR("pipeline already exists");
+ return;
+ }
+
DBG("Create Pipeline with url : %s", url.c_str());
GstBus *bus;
void RTSPClient::SetStateCallback(const StateCallback &cb, void *user_data)
{
std::lock_guard<std::mutex> auto_lock(state_cb_lock);
- state_cb = cb;
- state_cb_user_data = user_data;
+
+ state_cb = std::make_pair(cb, user_data);
}
void RTSPClient::SetDataCallback(const DataCallback &cb, void *user_data)
{
std::lock_guard<std::mutex> auto_lock(data_cb_lock);
- data_cb = cb;
- data_cb_user_data = user_data;
+
+ data_cb = std::make_pair(cb, user_data);
}
void RTSPClient::UnsetStateCallback()
{
std::lock_guard<std::mutex> auto_lock(state_cb_lock);
- state_cb = nullptr;
- state_cb_user_data = nullptr;
+
+ state_cb = std::make_pair(nullptr, nullptr);
}
void RTSPClient::UnsetClientCallback()
{
std::lock_guard<std::mutex> auto_lock(data_cb_lock);
- data_cb = nullptr;
- data_cb_user_data = nullptr;
+
+ data_cb = std::make_pair(nullptr, nullptr);
}
int RTSPClient::GetState()
class RTSPClient {
public:
- explicit RTSPClient(const std::string &url);
+ explicit RTSPClient();
~RTSPClient(void);
using StateCallback = std::function<void(void *user_data)>;
void Start();
void Stop();
- void CreatePipeline();
+ void CreatePipeline(const std::string &url);
void DestroyPipeline(void);
private:
gpointer data);
static gboolean MessageReceived(GstBus *bus, GstMessage *message, gpointer data);
- std::string url;
GstElement *pipeline;
- StateCallback state_cb;
- void *state_cb_user_data;
-
- DataCallback data_cb;
- void *data_cb_user_data;
+ std::pair<StateCallback, void *> state_cb;
+ std::pair<DataCallback, void *> data_cb;
std::mutex state_cb_lock;
std::mutex data_cb_lock;
#include "RTSPInfo.h"
-RTSPInfo::RTSPInfo(const std::string &_url, const std::string &_id, const std::string &_password)
- : url(_url), id(_id), password(_password)
+#include "aitt_internal.h"
+
+RTSPInfo::RTSPInfo() : url_(""), id_(""), password_("")
{
- // encoding secure id and password
}
RTSPInfo::~RTSPInfo()
{
}
+void RTSPInfo::SetUrl(const std::string &url)
+{
+ url_ = url;
+}
+
std::string RTSPInfo::GetUrl()
{
- return url;
+ return url_;
+}
+
+void RTSPInfo::SetID(const std::string &id)
+{
+ id_ = id;
}
std::string RTSPInfo::GetID()
{
- // decoding secure id
- return id;
+ return id_;
+}
+
+void RTSPInfo::SetPassword(const std::string &password)
+{
+ password_ = password;
}
std::string RTSPInfo::GetPassword()
{
- // decoding secure password
- return password;
+ return password_;
+}
+
+std::string RTSPInfo::GetCompleteUrl()
+{
+ std::string complete_url = url_;
+
+ if (id_.empty() != true && password_.empty() != true) {
+ complete_url.insert(7, id_ + ":" + password_ + "@");
+ }
+
+ return complete_url;
}
#include <string>
+#define RTSP_INFO_SERVER_STATE "server_state"
+#define RTSP_INFO_URL "url"
+#define RTSP_INFO_ID "id"
+#define RTSP_INFO_PASSWORD "password"
+
class RTSPInfo {
public:
- RTSPInfo(const std::string &url, const std::string &id, const std::string &password);
+ RTSPInfo(void);
~RTSPInfo(void);
+ void SetUrl(const std::string &url);
std::string GetUrl();
+ void SetID(const std::string &id);
std::string GetID();
+ void SetPassword(const std::string &password);
std::string GetPassword();
+ std::string GetCompleteUrl();
+
private:
- std::string url;
- std::string id;
- std::string password;
+ std::string url_;
+ std::string id_;
+ std::string password_;
};
void SetConfig(const std::string &key, const std::string &value) override;
void SetConfig(const std::string &key, void *obj) override;
void Start(void) override;
-
+ void Stop(void) override;
void SetStateCallback(StateCallback cb, void *user_data) override;
void SetReceiveCallback(ReceiveCallback cb, void *user_data) override;
}
}
-TEST(AittStreamTest, RTSP_Full_P)
-{
- try {
- AITT aitt("streamClientId", LOCAL_IP, AittOption(true, false));
-
- aitt.Connect();
-
- AittStream *publisher =
- aitt.CreateStream(AITT_STREAM_TYPE_RTSP, "topic", AITT_STREAM_ROLE_PUBLISHER);
+class AITTRTSPTest : public testing::Test {
+ protected:
+ void SetUp() override
+ {
+ aitt = new AITT("streamClientId", LOCAL_IP, AittOption(true, false));
+ aitt->Connect();
+ main_loop = g_main_loop_new(nullptr, FALSE);
+
+ publisher = aitt->CreateStream(AITT_STREAM_TYPE_RTSP, "topic", AITT_STREAM_ROLE_PUBLISHER);
ASSERT_TRUE(publisher) << "CreateStream() Fail";
- AittStream *subscriber =
- aitt.CreateStream(AITT_STREAM_TYPE_RTSP, "topic", AITT_STREAM_ROLE_SUBSCRIBER);
+ subscriber =
+ aitt->CreateStream(AITT_STREAM_TYPE_RTSP, "topic", AITT_STREAM_ROLE_SUBSCRIBER);
ASSERT_TRUE(subscriber) << "CreateStream() Fail";
+ }
+ void TearDown() override
+ {
+ g_main_loop_unref(main_loop);
+ aitt->DestroyStream(publisher);
+ aitt->DestroyStream(subscriber);
+ aitt->Disconnect();
+ delete aitt;
+ }
- publisher->SetConfig("key", "value");
+ AITT *aitt;
+ AittStream *publisher;
+ AittStream *subscriber;
+ GMainLoop *main_loop;
+};
+
+static gint SubscriberStart(gpointer argv)
+{
+ AittStream *subscriber = static_cast<AittStream *>(argv);
+
+ subscriber->Start();
+
+ return false;
+}
+
+TEST_F(AITTRTSPTest, Publisher_First_P)
+{
+ try {
+ publisher->SetConfig("url",
+ "rtsp://192.168.1.52:554/cam/realmonitor?channel=1&subtype=0&authbasic=64");
+ publisher->SetConfig("id", "admin");
+ publisher->SetConfig("password", "admin");
publisher->Start();
- subscriber->SetConfig("key", "value");
- subscriber->SetStateCallback([](AittStream *stream, int state, void *user_data) {},
- (void *)"user_data");
- subscriber->SetReceiveCallback([](AittStream *stream, void *obj, void *user_data) {},
- (void *)"user-data");
+ subscriber->SetReceiveCallback(
+ [&](AittStream *stream, void *obj, void *user_data) {
+ DBG("ReceiveCallback Called");
+ if (g_main_loop_is_running(main_loop))
+ g_main_loop_quit(main_loop);
+ },
+ nullptr);
+
+ g_timeout_add(3000, SubscriberStart, subscriber);
+ g_main_loop_run(main_loop);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+}
+
+TEST_F(AITTRTSPTest, Subscriber_First_P)
+{
+ try {
+ subscriber->SetReceiveCallback(
+ [&](AittStream *stream, void *obj, void *user_data) {
+ DBG("ReceiveCallback Called");
+ if (g_main_loop_is_running(main_loop))
+ g_main_loop_quit(main_loop);
+ },
+ nullptr);
subscriber->Start();
- aitt.DestroyStream(publisher);
- aitt.DestroyStream(subscriber);
+ publisher->SetConfig("url",
+ "rtsp://192.168.1.52:554/cam/realmonitor?channel=1&subtype=0&authbasic=64");
+ publisher->SetConfig("id", "admin");
+ publisher->SetConfig("password", "admin");
+ publisher->Start();
+
+ g_main_loop_run(main_loop);
} catch (std::exception &e) {
FAIL() << "Unexpected exception: " << e.what();
}
${AITT_UT}
COMMAND
${CMAKE_COMMAND} -E env
- LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH}
+ LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../modules/rtsp/:../:../common/:$ENV{LD_LIBRARY_PATH}
${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT} --gtest_filter=*_Anytime
)
${AITT_UT}_local
COMMAND
${CMAKE_COMMAND} -E env
- LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH}
+ LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../modules/rtsp/:../:../common/:$ENV{LD_LIBRARY_PATH}
${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT}_local --gtest_filter=*_Anytime
)