Add discovery message logic in RTSP
authorJihoon Jung <jh8801.jung@samsung.com>
Mon, 17 Oct 2022 07:39:34 +0000 (16:39 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Wed, 9 Nov 2022 08:18:06 +0000 (17:18 +0900)
include/AittStream.h
modules/rtsp/Module.cc
modules/rtsp/Module.h
modules/rtsp/RTSPClient.cc
modules/rtsp/RTSPClient.h
modules/rtsp/RTSPInfo.cc
modules/rtsp/RTSPInfo.h
modules/webrtc/Module.h
tests/AittStream_test.cc
tests/CMakeLists.txt

index 859d23b..3483edc 100644 (file)
@@ -35,6 +35,7 @@ class AittStream {
     virtual void SetConfig(const std::string &key, const std::string &value) = 0;
     virtual void SetConfig(const std::string &key, void *obj) = 0;
     virtual void Start(void) = 0;
+    virtual void Stop(void) = 0;
     virtual void SetStateCallback(StateCallback cb, void *user_data) = 0;
 
     // Subscriber ONLY
index 182188c..bd22f0f 100644 (file)
 
 #include "aitt_internal.h"
 
+#define RTSP_DBG(fmt, ...)                                       \
+    do {                                                         \
+        if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) \
+            DBG("[RTSP_SERVER] " fmt, ##__VA_ARGS__);            \
+        else                                                     \
+            DBG("[RTSP_CLIENT] " fmt, ##__VA_ARGS__);            \
+    } while (0)
+
+#define RTSP_ERR(fmt, ...)                                       \
+    do {                                                         \
+        if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) \
+            ERR("[RTSP_SERVER] " fmt, ##__VA_ARGS__);            \
+        else                                                     \
+            ERR("[RTSP_CLIENT] " fmt, ##__VA_ARGS__);            \
+    } while (0)
+
 namespace AittRTSPNamespace {
 
 Module::Module(AittDiscovery &discovery, const std::string &topic, AittStreamRole role)
-      : discovery_(discovery), topic_(topic), role_(role), info(nullptr), client(nullptr), current_state(0)
+      : discovery_(discovery),
+        topic_(topic),
+        role_(role),
+        server_state(AittStreamState::AITT_STREAM_STATE_INIT),
+        client_state(AittStreamState::AITT_STREAM_STATE_INIT)
 {
-    DBG("RTSP Module constructor : %s, role : %d", topic_.c_str(), role_);
+    RTSP_DBG("RTSP Module constructor : %s", topic_.c_str());
 
     discovery_cb_ = discovery_.AddDiscoveryCB(topic,
           std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1,
@@ -34,32 +54,164 @@ Module::Module(AittDiscovery &discovery, const std::string &topic, AittStreamRol
 
 Module::~Module(void)
 {
-    DBG("RTSP Module destroyer : %s, role : %d", topic_.c_str(), role_);
+    RTSP_DBG("RTSP Module destroyer : %s", topic_.c_str());
+
+    discovery_.RemoveDiscoveryCB(discovery_cb_);
 }
 
 void Module::SetConfig(const std::string &key, const std::string &value)
 {
     if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) {
-        /* Add rtsp server table */
+        if (key == "url") {
+            if (value.find("rtsp://") != 0) {
+                RTSP_ERR("rtsp url validation check failed");
+                return;
+            }
+
+            info.SetUrl(value);
+        } else if (key == "id") {
+            info.SetID(value);
+        } else if (key == "password") {
+            info.SetPassword(value);
+        }
     }
 }
 
 void Module::SetConfig(const std::string &key, void *obj)
 {
     if (role_ == AittStreamRole::AITT_STREAM_ROLE_SUBSCRIBER) {
-        /* Set evas object */
+        if (key == "display") {
+            RTSP_DBG("Set Evas object for display");
+        }
     }
 }
 
 void Module::Start(void)
 {
+    RTSP_DBG("Start");
+
+    if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) {
+        UpdateState(role_, AittStreamState::AITT_STREAM_STATE_READY);
+        UpdateDiscoveryMsg();
+    } else {
+        if (server_state == AittStreamState::AITT_STREAM_STATE_READY) {
+            RTSP_DBG("Playing Pipeline in Start() method");
+            client.Start();
+            UpdateState(role_, AittStreamState::AITT_STREAM_STATE_PLAYING);
+        } else {
+            RTSP_DBG("The pipeline not yet created. Wait..");
+            UpdateState(role_, AittStreamState::AITT_STREAM_STATE_READY);
+        }
+    }
+}
+
+void Module::Stop(void)
+{
+    RTSP_DBG("Stop");
+
     if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) {
-        /* Update Rtsp Server table with retained message of MQTT broker */
+        UpdateState(role_, AittStreamState::AITT_STREAM_STATE_INIT);
+        UpdateDiscoveryMsg();
+    } else {
+        if (client_state == AittStreamState::AITT_STREAM_STATE_PLAYING) {
+            client.Stop();
+        }
+
+        UpdateState(role_, AittStreamState::AITT_STREAM_STATE_INIT);
+    }
+}
+
+void Module::UpdateDiscoveryMsg()
+{
+    if (role_ == AittStreamRole::AITT_STREAM_ROLE_SUBSCRIBER)
+        return;
+
+    flexbuffers::Builder fbb;
+    fbb.Map([this, &fbb]() {
+        fbb.Int(RTSP_INFO_SERVER_STATE, static_cast<int>(server_state));
+        fbb.String(RTSP_INFO_URL, info.GetUrl());
+        fbb.String(RTSP_INFO_ID, info.GetID());
+        fbb.String(RTSP_INFO_PASSWORD, info.GetPassword());
+    });
+    fbb.Finish();
+
+    auto buf = fbb.GetBuffer();
+    discovery_.UpdateDiscoveryMsg(topic_, buf.data(), buf.size());
+}
+
+void Module::UpdateState(AittStreamRole role, AittStreamState state)
+{
+    if (role == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER)
+        server_state = state;
+    else
+        client_state = state;
+
+    if (role == role_ && state_cb.first != nullptr) {
+        state_cb.first(this, state, state_cb.second);
+    }
+}
+
+void Module::DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
+      const void *msg, const int szmsg)
+{
+    if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER)
+        return;
+
+    RTSP_DBG("DiscoveryMessageCallback called");
+
+    if (!status.compare(AittDiscovery::WILL_LEAVE_NETWORK)) {
+        AittStreamState next_state = AittStreamState::AITT_STREAM_STATE_INIT;
+
+        if (client_state == AittStreamState::AITT_STREAM_STATE_PLAYING) {
+            client.Stop();
+            next_state = AittStreamState::AITT_STREAM_STATE_READY;
+        }
+
+        client.DestroyPipeline();
+        UpdateState(role_, next_state);
+        return;
+    }
+
+    auto map = flexbuffers::GetRoot(static_cast<const uint8_t *>(msg), szmsg).AsMap();
+    if (map.size() != 4) {
+        RTSP_ERR("RTSP Info validation check failed");
+        return;
     }
-    else {
-        /* Check if the topic exists in server_table to find */
-        /* if exists, then create pipeline using that information */
-        /* if not, wait until discovery message from Publisher */
+
+    std::lock_guard<std::mutex> auto_lock(pipeline_lock);
+
+    UpdateState(AittStreamRole::AITT_STREAM_ROLE_PUBLISHER,
+          static_cast<AittStreamState>(map[RTSP_INFO_SERVER_STATE].AsInt64()));
+    info.SetUrl(map[RTSP_INFO_URL].AsString().c_str());
+    info.SetID(map[RTSP_INFO_ID].AsString().c_str());
+    info.SetPassword(map[RTSP_INFO_PASSWORD].AsString().c_str());
+
+    RTSP_DBG("server_state : %d, url : %s, id : %s, passwd : %s", server_state,
+          info.GetUrl().c_str(),
+          info.GetID().c_str(), info.GetPassword().c_str());
+
+    if (server_state == AittStreamState::AITT_STREAM_STATE_READY) {
+        client.CreatePipeline(info.GetCompleteUrl());
+
+        if (client_state == AittStreamState::AITT_STREAM_STATE_READY) {
+            RTSP_DBG("Playing pipeline in DiscoveryMessage Callback");
+
+            client.Start();
+
+            UpdateState(AittStreamRole::AITT_STREAM_ROLE_SUBSCRIBER,
+                  AittStreamState::AITT_STREAM_STATE_PLAYING);
+        }
+    } else if (server_state == AittStreamState::AITT_STREAM_STATE_INIT) {
+        if (client_state == AittStreamState::AITT_STREAM_STATE_PLAYING) {
+            RTSP_DBG("Stop pipeline in DiscoveryMessage Callback");
+
+            client.Stop();
+
+            UpdateState(AittStreamRole::AITT_STREAM_ROLE_SUBSCRIBER,
+                  AittStreamState::AITT_STREAM_STATE_READY);
+        }
+
+        client.DestroyPipeline();
     }
 }
 
@@ -68,12 +220,15 @@ void Module::SetStateCallback(StateCallback cb, void *user_data)
     if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER)
         return;
 
+    state_cb = std::make_pair(cb, user_data);
 }
 
 void Module::SetReceiveCallback(ReceiveCallback cb, void *user_data)
 {
     if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER)
         return;
+
+    receive_cb = std::make_pair(cb, user_data);
 }
 
 }  // namespace AittRTSPNamespace
index 0a16eba..8849f33 100644 (file)
@@ -36,11 +36,12 @@ class Module : public AittStreamModule {
     void SetConfig(const std::string &key, const std::string &value) override;
     void SetConfig(const std::string &key, void *obj) override;
     void Start(void) override;
-
+    void Stop(void) override;
     void SetStateCallback(StateCallback cb, void *user_data) override;
     void SetReceiveCallback(ReceiveCallback cb, void *user_data) override;
 
   private:
+    void UpdateState(AittStreamRole role, AittStreamState state);
     void UpdateDiscoveryMsg();
     void DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
           const void *msg, const int szmsg);
@@ -50,8 +51,14 @@ class Module : public AittStreamModule {
     AittStreamRole role_;
 
     int discovery_cb_;
-    RTSPInfo *info;
-    RTSPClient *client;
-    int current_state;
+    RTSPInfo info;
+    RTSPClient client;
+
+    AittStreamState server_state;
+    AittStreamState client_state;
+
+    std::mutex pipeline_lock;
+    std::pair<StateCallback, void *> state_cb;
+    std::pair<ReceiveCallback, void *> receive_cb;
 };
 }  // namespace AittRTSPNamespace
index c83f838..53f13f0 100644 (file)
 
 #include "aitt_internal.h"
 
-RTSPClient::RTSPClient(const std::string &_url)
-      : url(_url),
-        pipeline(nullptr),
-        state_cb_user_data(nullptr),
-        data_cb_user_data(nullptr),
-        state(0)
+RTSPClient::RTSPClient() : pipeline(nullptr), state(0)
 {
-    DBG("RTSPClient constructor");
 }
 
 RTSPClient::~RTSPClient()
 {
-    DBG("RTSPClient destructor");
 }
 
 void RTSPClient::OnPadAddedCB(GstElement *element, GstPad *pad, gpointer data)
@@ -78,8 +71,8 @@ void RTSPClient::VideoStreamDecodedCB(GstElement *object, GstBuffer *buffer, Gst
 
     /* need queueing and delete old frame */
     std::lock_guard<std::mutex> auto_lock(client->data_cb_lock);
-    if (client->data_cb != nullptr)
-        client->data_cb(frame, client->data_cb_user_data);
+    if (client->data_cb.first != nullptr)
+        client->data_cb.first(frame, client->data_cb.second);
 }
 
 gboolean RTSPClient::MessageReceived(GstBus *bus, GstMessage *message, gpointer data)
@@ -106,8 +99,18 @@ gboolean RTSPClient::MessageReceived(GstBus *bus, GstMessage *message, gpointer
     return TRUE;
 }
 
-void RTSPClient::CreatePipeline()
+void RTSPClient::CreatePipeline(const std::string &url)
 {
+    if (url.empty() == true) {
+        ERR("RTSP Server url is empty");
+        return;
+    }
+
+    if (pipeline != nullptr) {
+        ERR("pipeline already exists");
+        return;
+    }
+
     DBG("Create Pipeline with url : %s", url.c_str());
 
     GstBus *bus;
@@ -180,29 +183,29 @@ void RTSPClient::DestroyPipeline(void)
 void RTSPClient::SetStateCallback(const StateCallback &cb, void *user_data)
 {
     std::lock_guard<std::mutex> auto_lock(state_cb_lock);
-    state_cb = cb;
-    state_cb_user_data = user_data;
+
+    state_cb = std::make_pair(cb, user_data);
 }
 
 void RTSPClient::SetDataCallback(const DataCallback &cb, void *user_data)
 {
     std::lock_guard<std::mutex> auto_lock(data_cb_lock);
-    data_cb = cb;
-    data_cb_user_data = user_data;
+
+    data_cb = std::make_pair(cb, user_data);
 }
 
 void RTSPClient::UnsetStateCallback()
 {
     std::lock_guard<std::mutex> auto_lock(state_cb_lock);
-    state_cb = nullptr;
-    state_cb_user_data = nullptr;
+
+    state_cb = std::make_pair(nullptr, nullptr);
 }
 
 void RTSPClient::UnsetClientCallback()
 {
     std::lock_guard<std::mutex> auto_lock(data_cb_lock);
-    data_cb = nullptr;
-    data_cb_user_data = nullptr;
+
+    data_cb = std::make_pair(nullptr, nullptr);
 }
 
 int RTSPClient::GetState()
index 1e428e3..e0d17a1 100644 (file)
@@ -26,7 +26,7 @@
 
 class RTSPClient {
   public:
-    explicit RTSPClient(const std::string &url);
+    explicit RTSPClient();
     ~RTSPClient(void);
 
     using StateCallback = std::function<void(void *user_data)>;
@@ -41,7 +41,7 @@ class RTSPClient {
     void Start();
     void Stop();
 
-    void CreatePipeline();
+    void CreatePipeline(const std::string &url);
     void DestroyPipeline(void);
 
   private:
@@ -50,14 +50,10 @@ class RTSPClient {
           gpointer data);
     static gboolean MessageReceived(GstBus *bus, GstMessage *message, gpointer data);
 
-    std::string url;
     GstElement *pipeline;
 
-    StateCallback state_cb;
-    void *state_cb_user_data;
-
-    DataCallback data_cb;
-    void *data_cb_user_data;
+    std::pair<StateCallback, void *> state_cb;
+    std::pair<DataCallback, void *> data_cb;
 
     std::mutex state_cb_lock;
     std::mutex data_cb_lock;
index 91f8b8f..7c029a3 100644 (file)
 
 #include "RTSPInfo.h"
 
-RTSPInfo::RTSPInfo(const std::string &_url, const std::string &_id, const std::string &_password)
-      : url(_url), id(_id), password(_password)
+#include "aitt_internal.h"
+
+RTSPInfo::RTSPInfo() : url_(""), id_(""), password_("")
 {
-    // encoding secure id and password
 }
 
 RTSPInfo::~RTSPInfo()
 {
 }
 
+void RTSPInfo::SetUrl(const std::string &url)
+{
+    url_ = url;
+}
+
 std::string RTSPInfo::GetUrl()
 {
-    return url;
+    return url_;
+}
+
+void RTSPInfo::SetID(const std::string &id)
+{
+    id_ = id;
 }
 
 std::string RTSPInfo::GetID()
 {
-    // decoding secure id
-    return id;
+    return id_;
+}
+
+void RTSPInfo::SetPassword(const std::string &password)
+{
+    password_ = password;
 }
 
 std::string RTSPInfo::GetPassword()
 {
-    // decoding secure password
-    return password;
+    return password_;
+}
+
+std::string RTSPInfo::GetCompleteUrl()
+{
+    std::string complete_url = url_;
+
+    if (id_.empty() != true && password_.empty() != true) {
+        complete_url.insert(7, id_ + ":" + password_ + "@");
+    }
+
+    return complete_url;
 }
index 01da70a..ed79787 100644 (file)
 
 #include <string>
 
+#define RTSP_INFO_SERVER_STATE "server_state"
+#define RTSP_INFO_URL "url"
+#define RTSP_INFO_ID "id"
+#define RTSP_INFO_PASSWORD "password"
+
 class RTSPInfo {
   public:
-    RTSPInfo(const std::string &url, const std::string &id, const std::string &password);
+    RTSPInfo(void);
     ~RTSPInfo(void);
 
+    void SetUrl(const std::string &url);
     std::string GetUrl();
+    void SetID(const std::string &id);
     std::string GetID();
+    void SetPassword(const std::string &password);
     std::string GetPassword();
 
+    std::string GetCompleteUrl();
+
   private:
-    std::string url;
-    std::string id;
-    std::string password;
+    std::string url_;
+    std::string id_;
+    std::string password_;
 };
index 84896ab..0e00e31 100644 (file)
@@ -37,7 +37,7 @@ class Module : public AittStreamModule {
     void SetConfig(const std::string &key, const std::string &value) override;
     void SetConfig(const std::string &key, void *obj) override;
     void Start(void) override;
-
+    void Stop(void) override;
     void SetStateCallback(StateCallback cb, void *user_data) override;
     void SetReceiveCallback(ReceiveCallback cb, void *user_data) override;
 
index f37f726..eed1d96 100644 (file)
@@ -54,33 +54,88 @@ TEST(AittStreamTest, Webrtc_Full_P)
     }
 }
 
-TEST(AittStreamTest, RTSP_Full_P)
-{
-    try {
-        AITT aitt("streamClientId", LOCAL_IP, AittOption(true, false));
-
-        aitt.Connect();
-
-        AittStream *publisher =
-              aitt.CreateStream(AITT_STREAM_TYPE_RTSP, "topic", AITT_STREAM_ROLE_PUBLISHER);
+class AITTRTSPTest : public testing::Test {
+  protected:
+    void SetUp() override
+    {
+        aitt = new AITT("streamClientId", LOCAL_IP, AittOption(true, false));
+        aitt->Connect();
+        main_loop = g_main_loop_new(nullptr, FALSE);
+
+        publisher = aitt->CreateStream(AITT_STREAM_TYPE_RTSP, "topic", AITT_STREAM_ROLE_PUBLISHER);
         ASSERT_TRUE(publisher) << "CreateStream() Fail";
 
-        AittStream *subscriber =
-              aitt.CreateStream(AITT_STREAM_TYPE_RTSP, "topic", AITT_STREAM_ROLE_SUBSCRIBER);
+        subscriber =
+              aitt->CreateStream(AITT_STREAM_TYPE_RTSP, "topic", AITT_STREAM_ROLE_SUBSCRIBER);
         ASSERT_TRUE(subscriber) << "CreateStream() Fail";
+    }
+    void TearDown() override
+    {
+        g_main_loop_unref(main_loop);
+        aitt->DestroyStream(publisher);
+        aitt->DestroyStream(subscriber);
+        aitt->Disconnect();
+        delete aitt;
+    }
 
-        publisher->SetConfig("key", "value");
+    AITT *aitt;
+    AittStream *publisher;
+    AittStream *subscriber;
+    GMainLoop *main_loop;
+};
+
+static gint SubscriberStart(gpointer argv)
+{
+    AittStream *subscriber = static_cast<AittStream *>(argv);
+
+    subscriber->Start();
+
+    return false;
+}
+
+TEST_F(AITTRTSPTest, Publisher_First_P)
+{
+    try {
+        publisher->SetConfig("url",
+              "rtsp://192.168.1.52:554/cam/realmonitor?channel=1&subtype=0&authbasic=64");
+        publisher->SetConfig("id", "admin");
+        publisher->SetConfig("password", "admin");
         publisher->Start();
 
-        subscriber->SetConfig("key", "value");
-        subscriber->SetStateCallback([](AittStream *stream, int state, void *user_data) {},
-              (void *)"user_data");
-        subscriber->SetReceiveCallback([](AittStream *stream, void *obj, void *user_data) {},
-              (void *)"user-data");
+        subscriber->SetReceiveCallback(
+              [&](AittStream *stream, void *obj, void *user_data) {
+                  DBG("ReceiveCallback Called");
+                  if (g_main_loop_is_running(main_loop))
+                      g_main_loop_quit(main_loop);
+              },
+              nullptr);
+
+        g_timeout_add(3000, SubscriberStart, subscriber);
+        g_main_loop_run(main_loop);
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}
+
+TEST_F(AITTRTSPTest, Subscriber_First_P)
+{
+    try {
+        subscriber->SetReceiveCallback(
+              [&](AittStream *stream, void *obj, void *user_data) {
+                  DBG("ReceiveCallback Called");
+                  if (g_main_loop_is_running(main_loop))
+                      g_main_loop_quit(main_loop);
+              },
+              nullptr);
         subscriber->Start();
 
-        aitt.DestroyStream(publisher);
-        aitt.DestroyStream(subscriber);
+        publisher->SetConfig("url",
+              "rtsp://192.168.1.52:554/cam/realmonitor?channel=1&subtype=0&authbasic=64");
+        publisher->SetConfig("id", "admin");
+        publisher->SetConfig("password", "admin");
+        publisher->Start();
+
+        g_main_loop_run(main_loop);
     } catch (std::exception &e) {
         FAIL() << "Unexpected exception: " << e.what();
     }
index 4c8dab7..acb2f39 100644 (file)
@@ -19,7 +19,7 @@ ADD_TEST(
         ${AITT_UT}
     COMMAND
         ${CMAKE_COMMAND} -E env
-        LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH}
+        LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../modules/rtsp/:../:../common/:$ENV{LD_LIBRARY_PATH}
         ${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT} --gtest_filter=*_Anytime
 )
 
@@ -57,6 +57,6 @@ ADD_TEST(
         ${AITT_UT}_local
     COMMAND
         ${CMAKE_COMMAND} -E env
-        LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH}
+        LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../modules/rtsp/:../:../common/:$ENV{LD_LIBRARY_PATH}
         ${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT}_local --gtest_filter=*_Anytime
 )