Add methods for stream transport
authorJiung Yu <jiung.yu@samsung.com>
Mon, 26 Sep 2022 07:05:55 +0000 (16:05 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Wed, 9 Nov 2022 08:11:09 +0000 (17:11 +0900)
12 files changed:
common/AittException.cc
common/AittStreamTag.h [new file with mode: 0644]
include/AITT.h
include/AittException.h
include/AittTypes.h
src/AITT.cc
src/AITTImpl.cc
src/AITTImpl.h
tests/AITT_Stream_test.cc [new file with mode: 0644]
tests/AITT_Stream_tests.h [new file with mode: 0644]
tests/AITT_test.cc
tests/CMakeLists.txt

index c1d061a..6c51e85 100644 (file)
@@ -47,6 +47,8 @@ std::string AittException::getErrString() const
         return "MQTT failure";
     case NO_DATA_ERR:
         return "No data found";
+    case RESOURCE_BUSY_ERR:
+        return "Resource busy";
     default:
         return "Unknown Error";
     }
diff --git a/common/AittStreamTag.h b/common/AittStreamTag.h
new file mode 100644 (file)
index 0000000..9c4b7bf
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <AittTypes.h>
+
+#include <string>
+namespace aitt {
+
+class StreamTag {
+  public:
+    explicit StreamTag(const std::string &topic, AittProtocol protocol, AittStreamRole role,
+          void *handle)
+          : topic_(topic), protocol_(protocol), role_(role), handle_(handle){};
+    std::string topic_;
+    AittProtocol protocol_;
+    AittStreamRole role_;
+    void *handle_;
+};
+
+}  // namespace aitt
index 42ee6e2..140c8a8 100644 (file)
@@ -35,6 +35,11 @@ class API AITT {
           std::function<void(MSG *msg, const void *data, const size_t datalen, void *user_data)>;
     using ConnectionCallback = std::function<void(AITT &, int, void *user_data)>;
 
+    using StreamStateCallback =
+          std::function<void(AittStreamState state, void *user_data)>;
+    using StreamSinkCallback = std::function<void(const void *data,
+          const size_t datalen, void *user_data)>;
+
     explicit AITT(const std::string &id, const std::string &ip_addr,
           AittOption option = AittOption(false, false));
     virtual ~AITT(void);
@@ -64,6 +69,20 @@ class API AITT {
 
     void SendReply(MSG *msg, const void *data, const size_t datalen, bool end = true);
 
+    AittStreamID CreatePublishStream(const std::string &topic, AittProtocol protocol);
+    AittStreamID CreateSubscribeStream(const std::string &topic, AittProtocol protocol);
+    void DestroyStream(AittStreamID handle);
+    void SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value);
+    std::string GetStreamConfig(AittStreamID handle, const std::string &key);
+    void StartStream(AittStreamID handle);
+    void StopStream(AittStreamID handle);
+    void SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb,
+          void *user_data = nullptr);
+    void UnsetStreamStateCallback(AittStreamID handle);
+    void SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb,
+          void *user_data = nullptr);
+    void UnsetStreamSinkCallback(AittStreamID handle);
+
   private:
     class Impl;
     std::unique_ptr<Impl> pImpl;
index 6d9d302..688ac27 100644 (file)
@@ -30,6 +30,7 @@ class AittException : public std::exception {
         SYSTEM_ERR,
         MQTT_ERR,
         NO_DATA_ERR,
+        RESOURCE_BUSY_ERR,
     };
 
     AittException(ErrCode err_code);
index b9d22c8..1c959e5 100644 (file)
@@ -18,6 +18,7 @@
 #define API __attribute__((visibility("default")))
 
 typedef void* AittSubscribeID;
+typedef void* AittStreamID;
 
 enum AittProtocol {
     AITT_TYPE_UNKNOWN = 0,
@@ -40,6 +41,18 @@ enum AittConnectionState {
     AITT_CONNECT_FAILED = 2,  // Failed to connect to the mqtt broker.
 };
 
+enum AittStreamState {
+    AITT_STREAM_STATE_INIT = 0,
+    AITT_STREAM_STATE_READY = 1,
+    AITT_STREAM_STATE_PLAYING = 2,
+};
+
+enum AittStreamRole {
+    AITT_STREAM_ROLE_NONE = 0,
+    AITT_STREAM_ROLE_SRC = 1,
+    AITT_STREAM_ROLE_SINK = 2,
+};
+
 // The maximum size in bytes of a message. It follows MQTT
 #define AITT_MESSAGE_MAX 268435455
 
@@ -53,6 +66,7 @@ enum AittConnectionState {
 #define TIZEN_ERROR_INVALID_PARAMETER -EINVAL
 #define TIZEN_ERROR_PERMISSION_DENIED -EACCES
 #define TIZEN_ERROR_OUT_OF_MEMORY -ENOMEM
+#define TIZEN_ERROR_RESOURCE_BUSY -EBUSY
 #define TIZEN_ERROR_TIMED_OUT (-1073741824LL + 1)
 #define TIZEN_ERROR_NOT_SUPPORTED (-1073741824LL + 2)
 #define TIZEN_ERROR_AITT -0x04020000
@@ -63,6 +77,7 @@ enum AittError {
     AITT_ERROR_INVALID_PARAMETER = TIZEN_ERROR_INVALID_PARAMETER, /**< Invalid parameter */
     AITT_ERROR_PERMISSION_DENIED = TIZEN_ERROR_PERMISSION_DENIED, /**< Permission denied */
     AITT_ERROR_OUT_OF_MEMORY = TIZEN_ERROR_OUT_OF_MEMORY,         /**< Out of memory */
+    AITT_ERROR_RESOURCE_BUSY = TIZEN_ERROR_RESOURCE_BUSY,         /**< Resource Busy */
     AITT_ERROR_TIMED_OUT = TIZEN_ERROR_TIMED_OUT,                 /**< Time out */
     AITT_ERROR_NOT_SUPPORTED = TIZEN_ERROR_NOT_SUPPORTED,         /**< Not supported */
     AITT_ERROR_UNKNOWN = TIZEN_ERROR_AITT | 0x01,                 /**< Unknown Error */
index 0c2af62..a4625af 100644 (file)
@@ -129,4 +129,61 @@ void AITT::SendReply(MSG *msg, const void *data, size_t datalen, bool end)
     return pImpl->SendReply(msg, data, datalen, end);
 }
 
+AittStreamID AITT::CreatePublishStream(const std::string &topic, AittProtocol protocol)
+{
+    return pImpl->CreatePublishStream(topic, protocol);
+}
+
+AittStreamID AITT::CreateSubscribeStream(const std::string &topic, AittProtocol protocol)
+{
+    return pImpl->CreateSubscribeStream(topic, protocol);
+}
+
+void AITT::DestroyStream(AittStreamID handle)
+{
+    return pImpl->DestroyStream(handle);
+}
+
+void AITT::SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value)
+{
+    return pImpl->SetStreamConfig(handle, key, value);
+}
+
+std::string AITT::GetStreamConfig(AittStreamID handle, const std::string &key)
+{
+    return pImpl->GetStreamConfig(handle, key);
+}
+
+void AITT::StartStream(AittStreamID handle)
+{
+    return pImpl->StartStream(handle);
+}
+
+void AITT::StopStream(AittStreamID handle)
+{
+    return pImpl->StopStream(handle);
+}
+
+void AITT::SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb,
+      void *user_data)
+{
+    return pImpl->SetStreamStateCallback(handle, cb, user_data);
+}
+
+void AITT::UnsetStreamStateCallback(AittStreamID handle)
+{
+    return pImpl->UnsetStreamStateCallback(handle);
+}
+
+void AITT::SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb,
+      void *user_data)
+{
+    return pImpl->SetStreamSinkCallback(handle, cb, user_data);
+}
+
+void AITT::UnsetStreamSinkCallback(AittStreamID handle)
+{
+    return pImpl->UnsetStreamSinkCallback(handle);
+}
+
 }  // namespace aitt
index db91328..9a77e68 100644 (file)
@@ -26,9 +26,6 @@
 #include "MosquittoMQ.h"
 #include "aitt_internal.h"
 
-#define WEBRTC_ROOM_ID_PREFIX std::string(AITT_MANAGED_TOPIC_PREFIX "webrtc/room/Room.webrtc")
-#define WEBRTC_ID_POSTFIX std::string("_for_webrtc")
-
 namespace aitt {
 
 AITT::Impl::Impl(AITT &parent, const std::string &id, const std::string &my_ip,
@@ -111,6 +108,7 @@ void AITT::Impl::Connect(const std::string &host, int port, const std::string &u
 void AITT::Impl::Disconnect(void)
 {
     UnsubscribeAll();
+    DestroyAllStream();
 
     mqtt_broker_ip_.clear();
     mqtt_broker_port_ = -1;
@@ -130,7 +128,6 @@ void AITT::Impl::UnsubscribeAll()
             break;
         case AITT_TYPE_TCP:
         case AITT_TYPE_TCP_SECURE:
-        case AITT_TYPE_WEBRTC:
             modules.Get(subscribe_info->first).Unsubscribe(subscribe_info->second);
             break;
 
@@ -143,6 +140,26 @@ void AITT::Impl::UnsubscribeAll()
     }
     subscribed_list.clear();
 }
+void AITT::Impl::DestroyAllStream(void)
+{
+    std::unique_lock<std::mutex> lock(stream_list_mutex_);
+
+    for (auto tag : stream_list_) {
+        if (!tag)
+            continue;
+        //TODO call stream transport
+        delete tag;
+    }
+    stream_list_.clear();
+}
+
+void AITT::Impl::StreamInfoExist(StreamTag *tag)
+{
+    if (std::find(stream_list_.begin(), stream_list_.end(), tag) == stream_list_.end()) {
+        ERR("Unknown stream id(%p)", tag);
+        throw std::runtime_error("stream id");
+    }
+}
 
 void AITT::Impl::ConfigureTransportModule(const std::string &key, const std::string &value,
       AittProtocol protocols)
@@ -160,27 +177,6 @@ void AITT::Impl::Publish(const std::string &topic, const void *data, const size_
 
     if ((protocols & AITT_TYPE_TCP_SECURE) == AITT_TYPE_TCP_SECURE)
         modules.Get(AITT_TYPE_TCP_SECURE).Publish(topic, data, datalen, qos, retain);
-
-    if ((protocols & AITT_TYPE_WEBRTC) == AITT_TYPE_WEBRTC)
-        PublishWebRtc(topic, data, datalen, qos, retain);
-}
-
-void AITT::Impl::PublishWebRtc(const std::string &topic, const void *data, const size_t datalen,
-      AittQoS qos, bool retain)
-{
-    flexbuffers::Builder fbb;
-    fbb.Map([=, &fbb]() {
-        fbb.String("Id", id_ + WEBRTC_ID_POSTFIX);
-        fbb.String("BrokerIp", mqtt_broker_ip_);
-        fbb.Int("BrokerPort", mqtt_broker_port_);
-        fbb.String("RoomId", WEBRTC_ROOM_ID_PREFIX + topic);
-        fbb.String("SourceId", id_ + WEBRTC_ID_POSTFIX);
-        // TODO pass user data to WEBRTC module
-        fbb.UInt("UserDataLength", datalen);
-    });
-    fbb.Finish();
-    auto buf = fbb.GetBuffer();
-    modules.Get(AITT_TYPE_WEBRTC).Publish(topic, buf.data(), buf.size(), qos, retain);
 }
 
 AittSubscribeID AITT::Impl::Subscribe(const std::string &topic, const AITT::SubscribeCallback &cb,
@@ -198,9 +194,6 @@ AittSubscribeID AITT::Impl::Subscribe(const std::string &topic, const AITT::Subs
     case AITT_TYPE_TCP_SECURE:
         subscribe_handle = SubscribeTCP(info, topic, cb, user_data, qos);
         break;
-    case AITT_TYPE_WEBRTC:
-        subscribe_handle = SubscribeWebRtc(info, topic, cb, user_data, qos);
-        break;
     default:
         ERR("Unknown AittProtocol(%d)", protocol);
         delete info;
@@ -268,7 +261,6 @@ void *AITT::Impl::Unsubscribe(AittSubscribeID subscribe_id)
         break;
     case AITT_TYPE_TCP:
     case AITT_TYPE_TCP_SECURE:
-    case AITT_TYPE_WEBRTC:
         user_data = modules.Get(found_info->first).Unsubscribe(found_info->second);
         break;
 
@@ -411,32 +403,153 @@ void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic,
                 user_data, qos);
 }
 
-void *AITT::Impl::SubscribeWebRtc(SubscribeInfo *handle, const std::string &topic,
-      const SubscribeCallback &cb, void *user_data, AittQoS qos)
+AittStreamID AITT::Impl::CreatePublishStream(const std::string &topic, AittProtocol protocol)
 {
-    flexbuffers::Builder fbb;
-    fbb.Map([=, &fbb]() {
-        fbb.String("Id", id_ + WEBRTC_ID_POSTFIX);
-        fbb.String("BrokerIp", mqtt_broker_ip_);
-        fbb.String("RoomId", WEBRTC_ROOM_ID_PREFIX + topic);
-        fbb.Int("BrokerPort", mqtt_broker_port_);
-    });
-    fbb.Finish();
-    auto buf = fbb.GetBuffer();
-
-    return modules.Get(AITT_TYPE_WEBRTC)
-          .Subscribe(
-                topic,
-                [handle, cb](const std::string &topic, const void *data, const size_t datalen,
-                      void *user_data, const std::string &correlation) -> void {
-                    MSG msg;
-                    msg.SetID(handle);
-                    msg.SetTopic(topic);
-                    msg.SetCorrelation(correlation);
-                    msg.SetProtocols(AITT_TYPE_WEBRTC);
+   //TODO call stream transport
+    auto info = new StreamTag(topic, protocol, AITT_STREAM_ROLE_SRC, nullptr);
+    // TODO: be prepared when there's memory failure
+    {
+        std::unique_lock<std::mutex> lock(stream_list_mutex_);
+        stream_list_.push_back(info);
+    }
 
-                    return cb(&msg, data, datalen, user_data);
-                },
-                buf.data(), buf.size(), user_data, qos);
+    INFO("Stream topic(%s) : %p", topic.c_str(), info);
+    return reinterpret_cast<AittStreamID>(info);
+}
+
+AittStreamID AITT::Impl::CreateSubscribeStream(const std::string &topic,
+      AittProtocol protocol)
+{
+    //TODO call stream transport
+    auto info = new StreamTag(topic, protocol, AITT_STREAM_ROLE_SINK, nullptr);
+    // TODO: be prepared when there's memory failure
+    {
+        std::unique_lock<std::mutex> lock(stream_list_mutex_);
+        stream_list_.push_back(info);
+    }
+
+    INFO("Stream topic(%s) : %p", topic.c_str(), info);
+    return reinterpret_cast<AittStreamID>(info);
+}
+
+void AITT::Impl::DestroyStream(AittStreamID handle)
+{
+    INFO("stream id : %p", handle);
+    StreamTag *info = reinterpret_cast<StreamTag *>(handle);
+
+    std::unique_lock<std::mutex> lock(stream_list_mutex_);
+
+    auto it = std::find(stream_list_.begin(), stream_list_.end(), info);
+    if (it == stream_list_.end()) {
+        ERR("Unknown stream id(%p)", handle);
+        throw std::runtime_error("stream ID");
+    }
+
+    //TODO call stream transport
+
+    stream_list_.erase(it);
+    delete *it;
+}
+
+void AITT::Impl::SetStreamConfig(AittStreamID handle, const std::string &key,
+      const std::string &value)
+{
+    INFO("stream id : %p", handle);
+    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
+
+    std::unique_lock<std::mutex> lock(stream_list_mutex_);
+
+    StreamInfoExist(tag);
+
+    //TODO call stream transport
+}
+
+std::string AITT::Impl::GetStreamConfig(AittStreamID handle, const std::string &key)
+{
+    std::string value;
+    INFO("stream id : %p", handle);
+    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
+
+    std::unique_lock<std::mutex> lock(stream_list_mutex_);
+
+    StreamInfoExist(tag);
+
+    //TODO call stream transport
+
+    return value;
+}
+
+void AITT::Impl::StartStream(AittStreamID handle)
+{
+    INFO("stream id : %p", handle);
+    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
+
+    std::unique_lock<std::mutex> lock(stream_list_mutex_);
+
+    StreamInfoExist(tag);
+
+    //TODO call stream transport
+}
+
+void AITT::Impl::StopStream(AittStreamID handle)
+{
+    INFO("stream id : %p", handle);
+    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
+
+    std::unique_lock<std::mutex> lock(stream_list_mutex_);
+
+    StreamInfoExist(tag);
+
+    //TODO call stream transport
+}
+
+void AITT::Impl::SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb,
+      void *user_data)
+{
+    INFO("stream id : %p", handle);
+    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
+
+    std::unique_lock<std::mutex> lock(stream_list_mutex_);
+
+    StreamInfoExist(tag);
+
+    //TODO call stream transport
+}
+
+void AITT::Impl::UnsetStreamStateCallback(AittStreamID handle)
+{
+    INFO("stream id : %p", handle);
+    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
+
+    std::unique_lock<std::mutex> lock(stream_list_mutex_);
+
+    StreamInfoExist(tag);
+
+    //TODO call stream transport
 }
+
+void AITT::Impl::SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb, void *user_data)
+{
+    INFO("stream id : %p", handle);
+    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
+
+    std::unique_lock<std::mutex> lock(stream_list_mutex_);
+
+    StreamInfoExist(tag);
+
+    //TODO call stream transport
+}
+
+void AITT::Impl::UnsetStreamSinkCallback(AittStreamID handle)
+{
+    INFO("stream id : %p", handle);
+    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
+
+    std::unique_lock<std::mutex> lock(stream_list_mutex_);
+
+    StreamInfoExist(tag);
+
+    //TODO call stream transport
+}
+
 }  // namespace aitt
index fcd7f08..097e749 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "AITT.h"
 #include "AittDiscovery.h"
+#include "AittStreamTag.h"
 #include "MQ.h"
 #include "MainLoopHandler.h"
 #include "ModuleManager.h"
@@ -63,6 +64,18 @@ class AITT::Impl {
 
     void SendReply(MSG *msg, const void *data, const int datalen, bool end);
 
+    AittStreamID CreatePublishStream(const std::string &topic, AittProtocol protocol);
+    AittStreamID CreateSubscribeStream(const std::string &topic, AittProtocol protocol);
+    void DestroyStream(AittStreamID handle);
+    void SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value);
+    std::string GetStreamConfig(AittStreamID handle, const std::string &key);
+    void StartStream(AittStreamID handle);
+    void StopStream(AittStreamID handle);
+    void SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb, void *user_data);
+    void UnsetStreamStateCallback(AittStreamID handle);
+    void SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb, void *user_data);
+    void UnsetStreamSinkCallback(AittStreamID handle);
+
   private:
     using Blob = std::pair<const void *, int>;
     using SubscribeInfo = std::pair<AittProtocol, void *>;
@@ -83,6 +96,8 @@ class AITT::Impl {
     void PublishWebRtc(const std::string &topic, const void *data, const size_t datalen,
           AittQoS qos, bool retain);
     void UnsubscribeAll();
+    void DestroyAllStream(void);
+    void StreamInfoExist(StreamTag *tag);
     void ThreadMain(void);
 
     AITT &public_api;
@@ -93,6 +108,8 @@ class AITT::Impl {
     std::unique_ptr<MQ> mq;
     std::vector<SubscribeInfo *> subscribed_list;
     std::mutex subscribed_list_mutex_;
+    std::vector<StreamTag *> stream_list_;
+    std::mutex stream_list_mutex_;
 
     std::string id_;
     std::string mqtt_broker_ip_;
diff --git a/tests/AITT_Stream_test.cc b/tests/AITT_Stream_test.cc
new file mode 100644 (file)
index 0000000..aa33cc3
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+#include <sys/random.h>
+
+#include <thread>
+
+#include "AITT.h"
+#include "AITT_Stream_tests.h"
+
+using AITT = aitt::AITT;
+
+class AITTStreamTest : public testing::Test, public AittStreamTests {
+  protected:
+    void SetUp() override { Init(); }
+    void TearDown() override { Deinit(); }
+};
+
+TEST_F(AITTStreamTest, Positive_Create_Pubilsh_Stream_WebRTC_Anytime)
+{
+    try {
+        AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
+        aitt.Connect();
+        publish_stream_handle_ =
+              aitt.CreatePublishStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
+        EXPECT_NE(publish_stream_handle_, nullptr);
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}
+
+TEST_F(AITTStreamTest, Positive_Create_Subscribe_Stream_WebRTC_Anytime)
+{
+    try {
+        AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
+        aitt.Connect();
+        subscribe_stream_handle_ =
+              aitt.CreateSubscribeStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
+        EXPECT_NE(subscribe_stream_handle_, nullptr);
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}
+
+TEST_F(AITTStreamTest, Positive_Destroy_Pubilsh_Stream_WebRTC_Anytime)
+{
+    try {
+        AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
+        aitt.Connect();
+        publish_stream_handle_ =
+              aitt.CreatePublishStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
+        aitt.DestroyStream(publish_stream_handle_);
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}
+
+TEST_F(AITTStreamTest, Positive_Destroy_Subscribe_Stream_WebRTC_Anytime)
+{
+    try {
+        AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
+        aitt.Connect();
+        subscribe_stream_handle_ =
+              aitt.CreateSubscribeStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
+        aitt.DestroyStream(subscribe_stream_handle_);
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}
+
+TEST_F(AITTStreamTest, Negative_Destroy_Stream_Anytime)
+{
+    EXPECT_THROW(
+          {
+              try {
+                  AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
+                  aitt.Connect();
+                  aitt.DestroyStream(nullptr);
+              } catch (const std::runtime_error &e) {
+                  // and this tests that it has the correct message
+                  throw;
+              }
+          },
+          std::runtime_error);
+}
diff --git a/tests/AITT_Stream_tests.h b/tests/AITT_Stream_tests.h
new file mode 100644 (file)
index 0000000..7fc1b7a
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <glib.h>
+#include <sys/time.h>
+
+#include "aitt_internal.h"
+
+#define LOCAL_IP "127.0.0.1"
+#define TEST_STREAM_TOPIC "test/stream_topic"
+#define TEST_CONFIG_KEY "test_config_key"
+#define TEST_CONFIG_VALUE "test_config_value"
+
+class AittStreamTests {
+  public:
+    void Init()
+    {
+        timeval tv;
+        char buffer[256];
+        gettimeofday(&tv, nullptr);
+        snprintf(buffer, sizeof(buffer), "UniqueStreamSrcID.%lX%lX", tv.tv_sec, tv.tv_usec);
+        stream_src_id_ = buffer;
+        snprintf(buffer, sizeof(buffer), "UniqueStreamSinkID.%lX%lX", tv.tv_sec, tv.tv_usec);
+        stream_sink_id_ = buffer;
+        stream_topic_ = "StreamTopic";
+        mainLoop_ = g_main_loop_new(nullptr, FALSE);
+    }
+
+    void Deinit() { g_main_loop_unref(mainLoop_); }
+
+    void IterateEventLoop(void)
+    {
+        g_main_loop_run(mainLoop_);
+        DBG("Go forward");
+    }
+
+    AittStreamID publish_stream_handle_;
+    AittStreamID subscribe_stream_handle_;
+
+    std::string stream_src_id_;
+    std::string stream_sink_id_;
+    std::string stream_topic_;
+    GMainLoop *mainLoop_;
+};
index 7befc01..3135ef7 100644 (file)
@@ -379,22 +379,6 @@ TEST_F(AITTTest, Publish_Multiple_Protocols_P_Anytime)
         aitt.Connect();
         aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG),
               (AittProtocol)(AITT_TYPE_MQTT | AITT_TYPE_TCP));
-        aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG),
-              (AittProtocol)(AITT_TYPE_MQTT | AITT_TYPE_TCP | AITT_TYPE_WEBRTC));
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception: " << e.what();
-    }
-}
-
-TEST_F(AITTTest, Subscribe_WebRTC_P_Anytime)
-{
-    try {
-        AITT aitt(clientId, LOCAL_IP, AittOption(true, false));
-        aitt.Connect();
-        aitt.Subscribe(
-              testTopic,
-              [](aitt::MSG *handle, const void *msg, const size_t szmsg, void *cbdata) -> void {},
-              nullptr, AITT_TYPE_WEBRTC);
     } catch (std::exception &e) {
         FAIL() << "Unexpected exception: " << e.what();
     }
index 43f4d40..436bfaa 100644 (file)
@@ -18,7 +18,7 @@ ADD_TEST(
         ${AITT_UT}
     COMMAND
         ${CMAKE_COMMAND} -E env
-        LD_LIBRARY_PATH=../modules/tcp/:../:../common/:$ENV{LD_LIBRARY_PATH}
+        LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH}
         ${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT} --gtest_filter=*_Anytime
 )
 
@@ -57,6 +57,20 @@ ADD_TEST(
         ${AITT_UT}_module
     COMMAND
         ${CMAKE_COMMAND} -E env
-        LD_LIBRARY_PATH=../modules/tcp/:../:../common/:$ENV{LD_LIBRARY_PATH}
+        LD_LIBRARY_PATH=../modules/webrtc/:../modules/tcp/:../:../common/:$ENV{LD_LIBRARY_PATH}
         ${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT}_module --gtest_filter=*_Anytime
 )
+###########################################################################
+ADD_EXECUTABLE(${AITT_UT}_stream AITT_Stream_test.cc)
+TARGET_LINK_LIBRARIES(${AITT_UT}_stream Threads::Threads ${UT_NEEDS_LIBRARIES} ${PROJECT_NAME})
+
+INSTALL(TARGETS ${AITT_UT}_stream DESTINATION ${AITT_TEST_BINDIR})
+
+ADD_TEST(
+    NAME
+        ${AITT_UT}_stream
+    COMMAND
+        ${CMAKE_COMMAND} -E env
+        LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH}
+        ${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT}_stream --gtest_filter=*_Anytime
+)