propose revised stream API
authorYoungjae Shin <yj99.shin@samsung.com>
Thu, 29 Sep 2022 22:56:37 +0000 (07:56 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Wed, 9 Nov 2022 08:14:49 +0000 (17:14 +0900)
18 files changed:
common/AittDiscovery.cc
common/AittDiscovery.h
common/AittStreamModule.h [moved from common/AittStreamTag.h with 52% similarity]
include/AITT.h
include/AittStream.h [new file with mode: 0644]
include/AittTypes.h
modules/tcp/Module.cc
modules/tcp/Module.h
modules/tcp/TCP.cc
src/AITT.cc
src/AITTImpl.cc
src/AITTImpl.h
src/ModuleManager.cc
src/ModuleManager.h
tests/AITT_Stream_test.cc [deleted file]
tests/AITT_Stream_tests.h [deleted file]
tests/AittStream_test.cc [new file with mode: 0644]
tests/CMakeLists.txt

index 4c6ed42..0b65af9 100644 (file)
@@ -45,6 +45,14 @@ void AittDiscovery::Start(const std::string &host, int port, const std::string &
           static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
 }
 
+void AittDiscovery::Restart()
+{
+    RET_IF(callback_handle);
+    discovery_mq->Unsubscribe(callback_handle);
+    callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
+          static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
+}
+
 void AittDiscovery::Stop()
 {
     discovery_mq->Unsubscribe(callback_handle);
@@ -54,7 +62,7 @@ void AittDiscovery::Stop()
     discovery_mq->Disconnect();
 }
 
-void AittDiscovery::UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, size_t length)
+void AittDiscovery::UpdateDiscoveryMsg(const std::string &protocol, const void *msg, size_t length)
 {
     auto it = discovery_map.find(protocol);
     if (it == discovery_map.end())
@@ -65,7 +73,7 @@ void AittDiscovery::UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, s
     PublishDiscoveryMsg();
 }
 
-int AittDiscovery::AddDiscoveryCB(AittProtocol protocol, const DiscoveryCallback &cb)
+int AittDiscovery::AddDiscoveryCB(const std::string &protocol, const DiscoveryCallback &cb)
 {
     static std::atomic_int id(0);
     id++;
@@ -105,7 +113,7 @@ void AittDiscovery::DiscoveryMessageCallback(MSG *mq, const std::string &topic,
 
     if (msg == nullptr) {
         for (const auto &node : discovery->callbacks) {
-            std::pair<AittProtocol, DiscoveryCallback> cb_info = node.second;
+            std::pair<std::string, DiscoveryCallback> cb_info = node.second;
             cb_info.second(clientId, WILL_LEAVE_NETWORK, nullptr, 0);
         }
         return;
@@ -123,8 +131,8 @@ void AittDiscovery::DiscoveryMessageCallback(MSG *mq, const std::string &topic,
 
         auto blob = map[key].AsBlob();
         for (const auto &node : discovery->callbacks) {
-            std::pair<AittProtocol, DiscoveryCallback> cb_info = node.second;
-            if (cb_info.first == discovery->GetProtocol(key)) {
+            std::pair<std::string, DiscoveryCallback> cb_info = node.second;
+            if (cb_info.first == key) {
                 cb_info.second(clientId, status, blob.data(), blob.size());
             }
         }
@@ -138,8 +146,8 @@ void AittDiscovery::PublishDiscoveryMsg()
     fbb.Map([this, &fbb]() {
         fbb.String("status", JOIN_NETWORK);
 
-        for (const std::pair<const AittProtocol, const DiscoveryBlob &> &node : discovery_map) {
-            fbb.Key(GetProtocolStr(node.first));
+        for (const std::pair<const std::string &, const DiscoveryBlob &> &node : discovery_map) {
+            fbb.Key(node.first);
             fbb.Blob(node.second.data.get(), node.second.len);
         }
     });
@@ -151,41 +159,6 @@ void AittDiscovery::PublishDiscoveryMsg()
           true);
 }
 
-const char *AittDiscovery::GetProtocolStr(AittProtocol protocol)
-{
-    switch (protocol) {
-    case AITT_TYPE_MQTT:
-        return "mqtt";
-    case AITT_TYPE_TCP:
-        return "tcp";
-    case AITT_TYPE_TCP_SECURE:
-        return "tcp_secure";
-    case AITT_TYPE_WEBRTC:
-        return "webrtc";
-    default:
-        ERR("Unknown protocol(%d)", protocol);
-    }
-
-    return nullptr;
-}
-
-AittProtocol AittDiscovery::GetProtocol(const std::string &protocol_str)
-{
-    if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_MQTT)))
-        return AITT_TYPE_MQTT;
-
-    if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_TCP)))
-        return AITT_TYPE_TCP;
-
-    if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_TCP_SECURE)))
-        return AITT_TYPE_TCP_SECURE;
-
-    if (STR_EQ == protocol_str.compare(GetProtocolStr(AITT_TYPE_WEBRTC)))
-        return AITT_TYPE_WEBRTC;
-
-    return AITT_TYPE_UNKNOWN;
-}
-
 AittDiscovery::DiscoveryBlob::DiscoveryBlob(const void *msg, size_t length)
       : len(length), data(new char[len])
 {
index 6725e6f..29c3302 100644 (file)
@@ -35,9 +35,10 @@ class AittDiscovery {
     void SetMQ(std::unique_ptr<MQ> mq);
     void Start(const std::string &host, int port, const std::string &username,
           const std::string &password);
+    void Restart();
     void Stop();
-    void UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, size_t length);
-    int AddDiscoveryCB(AittProtocol protocol, const DiscoveryCallback &cb);
+    void UpdateDiscoveryMsg(const std::string &protocol, const void *msg, size_t length);
+    int AddDiscoveryCB(const std::string &protocol, const DiscoveryCallback &cb);
     void RemoveDiscoveryCB(int callback_id);
     bool CompareTopic(const std::string &left, const std::string &right);
 
@@ -55,14 +56,12 @@ class AittDiscovery {
     static void DiscoveryMessageCallback(MSG *mq, const std::string &topic, const void *msg,
           const int szmsg, void *user_data);
     void PublishDiscoveryMsg();
-    const char *GetProtocolStr(AittProtocol protocol);
-    AittProtocol GetProtocol(const std::string &protocol_str);
 
     std::string id_;
     std::unique_ptr<MQ> discovery_mq;
     void *callback_handle;
-    std::map<AittProtocol, DiscoveryBlob> discovery_map;
-    std::map<int, std::pair<AittProtocol, DiscoveryCallback>> callbacks;
+    std::map<std::string, DiscoveryBlob> discovery_map;
+    std::map<int, std::pair<std::string, DiscoveryCallback>> callbacks;
 };
 
 // Discovery Message (flexbuffers)
similarity index 52%
rename from common/AittStreamTag.h
rename to common/AittStreamModule.h
index 9c4b7bf..a48c74b 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 #pragma once
 
+#include <AittDiscovery.h>
+#include <AittStream.h>
 #include <AittTypes.h>
 
+#include <functional>
 #include <string>
+
+#define AITT_STREAM_NEW aitt_stream_new
+#define TO_STR(s) #s
+#define DEFINE_TO_STR(x) TO_STR(x)
+
 namespace aitt {
 
-class StreamTag {
+class AittStreamModule : public AittStream {
   public:
-    explicit StreamTag(const std::string &topic, AittProtocol protocol, AittStreamRole role,
-          void *handle)
-          : topic_(topic), protocol_(protocol), role_(role), handle_(handle){};
-    std::string topic_;
-    AittProtocol protocol_;
-    AittStreamRole role_;
-    void *handle_;
+    typedef void *(
+          *ModuleEntry)(AittDiscovery &discovery, const std::string &topic, AittStreamRole role);
+
+    static constexpr const char *const MODULE_ENTRY_NAME = DEFINE_TO_STR(AITT_STREAM_NEW);
+
+    AittStreamModule() = default;
+    virtual ~AittStreamModule(void) = default;
 };
 
 }  // namespace aitt
+
+#undef TO_STR
+#undef DEFINE_TO_STR
index 140c8a8..eac7556 100644 (file)
@@ -17,6 +17,7 @@
 
 #include <AittException.h>
 #include <AittOption.h>
+#include <AittStream.h>
 #include <AittTypes.h>
 #include <MSG.h>
 
@@ -35,11 +36,6 @@ class API AITT {
           std::function<void(MSG *msg, const void *data, const size_t datalen, void *user_data)>;
     using ConnectionCallback = std::function<void(AITT &, int, void *user_data)>;
 
-    using StreamStateCallback =
-          std::function<void(AittStreamState state, void *user_data)>;
-    using StreamSinkCallback = std::function<void(const void *data,
-          const size_t datalen, void *user_data)>;
-
     explicit AITT(const std::string &id, const std::string &ip_addr,
           AittOption option = AittOption(false, false));
     virtual ~AITT(void);
@@ -69,19 +65,9 @@ class API AITT {
 
     void SendReply(MSG *msg, const void *data, const size_t datalen, bool end = true);
 
-    AittStreamID CreatePublishStream(const std::string &topic, AittProtocol protocol);
-    AittStreamID CreateSubscribeStream(const std::string &topic, AittProtocol protocol);
-    void DestroyStream(AittStreamID handle);
-    void SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value);
-    std::string GetStreamConfig(AittStreamID handle, const std::string &key);
-    void StartStream(AittStreamID handle);
-    void StopStream(AittStreamID handle);
-    void SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb,
-          void *user_data = nullptr);
-    void UnsetStreamStateCallback(AittStreamID handle);
-    void SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb,
-          void *user_data = nullptr);
-    void UnsetStreamSinkCallback(AittStreamID handle);
+    AittStream *CreateStream(AittStreamProtocol type, const std::string &topic,
+          AittStreamRole role);
+    void DestroyStream(AittStream *aitt_stream);
 
   private:
     class Impl;
diff --git a/include/AittStream.h b/include/AittStream.h
new file mode 100644 (file)
index 0000000..1261a50
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <AittTypes.h>
+
+#include <functional>
+#include <memory>
+#include <string>
+
+namespace aitt {
+
+class AittStream {
+  public:
+    using StateCallback =
+          std::function<void(AittStream *stream, AittStreamState state, void *user_data)>;
+    using ReceiveCallback = std::function<void(AittStream *stream, void *data, void *user_data)>;
+
+    AittStream() = default;
+    virtual ~AittStream() = default;
+
+    virtual void SetConfig(const std::string &key, const std::string &value) = 0;
+    virtual void SetConfig(const std::string &key, void *obj) = 0;
+    virtual void Start() = 0;
+    virtual void SetStateCallback(StateCallback cb, void *user_data) = 0;
+
+    // Subscriber ONLY
+    virtual void SetReceiveCallback(ReceiveCallback cb, void *user_data) = 0;
+    // virtual void SetMediaInfoCallback(???);
+};
+}  // namespace aitt
index 1c959e5..88a1d3e 100644 (file)
 #define API __attribute__((visibility("default")))
 
 typedef void* AittSubscribeID;
-typedef void* AittStreamID;
 
 enum AittProtocol {
     AITT_TYPE_UNKNOWN = 0,
     AITT_TYPE_MQTT = (0x1 << 0),        // Publish message through the MQTT
     AITT_TYPE_TCP = (0x1 << 1),         // Publish message to peers using the TCP
     AITT_TYPE_TCP_SECURE = (0x1 << 2),  // Publish message to peers using the Secure TCP
-    AITT_TYPE_WEBRTC = (0x1 << 3),      // Publish message to peers using the WEBRTC
+};
+
+enum AittStreamProtocol {
+    AITT_STREAM_TYPE_WEBRTC,  // Publish message to peers using the WEBRTC
+    AITT_STREAM_TYPE_RTSP,    // Publish message to peers using the RTSP
+    AITT_STREAM_TYPE_MAX
+};
+
+enum AittStreamState {
+    AITT_STREAM_STATE_INIT = 0,
+    AITT_STREAM_STATE_READY = 1,
+    AITT_STREAM_STATE_PLAYING = 2,
+};
+
+enum AittStreamRole {
+    AITT_STREAM_ROLE_PUBLISHER = 0,   // Role of source media
+    AITT_STREAM_ROLE_SUBSCRIBER = 1,  // Role of destination(receiver)
 };
 
 // AittQoS only works with the AITT_TYPE_MQTT
@@ -41,18 +56,6 @@ enum AittConnectionState {
     AITT_CONNECT_FAILED = 2,  // Failed to connect to the mqtt broker.
 };
 
-enum AittStreamState {
-    AITT_STREAM_STATE_INIT = 0,
-    AITT_STREAM_STATE_READY = 1,
-    AITT_STREAM_STATE_PLAYING = 2,
-};
-
-enum AittStreamRole {
-    AITT_STREAM_ROLE_NONE = 0,
-    AITT_STREAM_ROLE_SRC = 1,
-    AITT_STREAM_ROLE_SINK = 2,
-};
-
 // The maximum size in bytes of a message. It follows MQTT
 #define AITT_MESSAGE_MAX 268435455
 
@@ -63,10 +66,10 @@ enum AittStreamRole {
 #include <errno.h>
 
 #define TIZEN_ERROR_NONE 0
-#define TIZEN_ERROR_INVALID_PARAMETER -EINVAL
-#define TIZEN_ERROR_PERMISSION_DENIED -EACCES
 #define TIZEN_ERROR_OUT_OF_MEMORY -ENOMEM
+#define TIZEN_ERROR_PERMISSION_DENIED -EACCES
 #define TIZEN_ERROR_RESOURCE_BUSY -EBUSY
+#define TIZEN_ERROR_INVALID_PARAMETER -EINVAL
 #define TIZEN_ERROR_TIMED_OUT (-1073741824LL + 1)
 #define TIZEN_ERROR_NOT_SUPPORTED (-1073741824LL + 2)
 #define TIZEN_ERROR_AITT -0x04020000
@@ -74,13 +77,13 @@ enum AittStreamRole {
 
 enum AittError {
     AITT_ERROR_NONE = TIZEN_ERROR_NONE,                           /**< On Success */
-    AITT_ERROR_INVALID_PARAMETER = TIZEN_ERROR_INVALID_PARAMETER, /**< Invalid parameter */
-    AITT_ERROR_PERMISSION_DENIED = TIZEN_ERROR_PERMISSION_DENIED, /**< Permission denied */
     AITT_ERROR_OUT_OF_MEMORY = TIZEN_ERROR_OUT_OF_MEMORY,         /**< Out of memory */
-    AITT_ERROR_RESOURCE_BUSY = TIZEN_ERROR_RESOURCE_BUSY,         /**< Resource Busy */
+    AITT_ERROR_PERMISSION_DENIED = TIZEN_ERROR_PERMISSION_DENIED, /**< Permission denied */
+    AITT_ERROR_RESOURCE_BUSY = TIZEN_ERROR_RESOURCE_BUSY,         /**< Device or resource busy */
+    AITT_ERROR_INVALID_PARAMETER = TIZEN_ERROR_INVALID_PARAMETER, /**< Invalid parameter */
     AITT_ERROR_TIMED_OUT = TIZEN_ERROR_TIMED_OUT,                 /**< Time out */
     AITT_ERROR_NOT_SUPPORTED = TIZEN_ERROR_NOT_SUPPORTED,         /**< Not supported */
     AITT_ERROR_UNKNOWN = TIZEN_ERROR_AITT | 0x01,                 /**< Unknown Error */
     AITT_ERROR_SYSTEM = TIZEN_ERROR_AITT | 0x02,                  /**< System errors */
-    AITT_ERROR_NOT_READY = TIZEN_ERROR_AITT | 0x03,               /**< System errors */
+    AITT_ERROR_NOT_READY = TIZEN_ERROR_AITT | 0x03,               /**< Not available */
 };
index 81717c5..3246ff7 100644 (file)
@@ -29,7 +29,7 @@ Module::Module(AittProtocol type, AittDiscovery &discovery, const std::string &m
 {
     aittThread = std::thread(&Module::ThreadMain, this);
 
-    discovery_cb = discovery.AddDiscoveryCB(type,
+    discovery_cb = discovery.AddDiscoveryCB(NAME[secure],
           std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1,
                 std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
     DBG("Discovery Callback : %p, %d", this, discovery_cb);
@@ -306,8 +306,7 @@ void Module::UpdateDiscoveryMsg()
     fbb.Finish();
 
     auto buf = fbb.GetBuffer();
-    discovery.UpdateDiscoveryMsg(secure ? AITT_TYPE_TCP_SECURE : AITT_TYPE_TCP, buf.data(),
-          buf.size());
+    discovery.UpdateDiscoveryMsg(NAME[secure], buf.data(), buf.size());
 }
 
 void Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
index ed748a1..7a53edd 100644 (file)
@@ -122,6 +122,7 @@ class Module : public AittTransport {
     void UpdatePublishTable(const std::string &topic, const std::string &host,
           const TCP::ConnectInfo &info);
 
+    const char *const NAME[2] = {"TCP", "SECURE_TCP"};
     MainLoopHandler main_loop;
     std::thread aittThread;
     int discovery_cb;
index 4815e20..d291b39 100644 (file)
@@ -234,6 +234,10 @@ int TCP::RecvSizedDataNormal(void **data, size_t &data_size)
     if (data_len == UINT32_MAX)
         return HandleZeroMsg(data, data_size);
 
+    if (AITT_MESSAGE_MAX < data_len) {
+        ERR("Invalid Size(%zu)", data_len);
+        return -1;
+    }
     void *data_buf = malloc(data_len);
     Recv(data_buf, data_len);
     data_size = data_len;
index a4625af..36d0437 100644 (file)
@@ -129,61 +129,15 @@ void AITT::SendReply(MSG *msg, const void *data, size_t datalen, bool end)
     return pImpl->SendReply(msg, data, datalen, end);
 }
 
-AittStreamID AITT::CreatePublishStream(const std::string &topic, AittProtocol protocol)
+AittStream *AITT::CreateStream(AittStreamProtocol type, const std::string &topic,
+      AittStreamRole role)
 {
-    return pImpl->CreatePublishStream(topic, protocol);
+    return pImpl->CreateStream(type, topic, role);
 }
 
-AittStreamID AITT::CreateSubscribeStream(const std::string &topic, AittProtocol protocol)
+void AITT::DestroyStream(AittStream *aitt_stream)
 {
-    return pImpl->CreateSubscribeStream(topic, protocol);
-}
-
-void AITT::DestroyStream(AittStreamID handle)
-{
-    return pImpl->DestroyStream(handle);
-}
-
-void AITT::SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value)
-{
-    return pImpl->SetStreamConfig(handle, key, value);
-}
-
-std::string AITT::GetStreamConfig(AittStreamID handle, const std::string &key)
-{
-    return pImpl->GetStreamConfig(handle, key);
-}
-
-void AITT::StartStream(AittStreamID handle)
-{
-    return pImpl->StartStream(handle);
-}
-
-void AITT::StopStream(AittStreamID handle)
-{
-    return pImpl->StopStream(handle);
-}
-
-void AITT::SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb,
-      void *user_data)
-{
-    return pImpl->SetStreamStateCallback(handle, cb, user_data);
-}
-
-void AITT::UnsetStreamStateCallback(AittStreamID handle)
-{
-    return pImpl->UnsetStreamStateCallback(handle);
-}
-
-void AITT::SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb,
-      void *user_data)
-{
-    return pImpl->SetStreamSinkCallback(handle, cb, user_data);
-}
-
-void AITT::UnsetStreamSinkCallback(AittStreamID handle)
-{
-    return pImpl->UnsetStreamSinkCallback(handle);
+    return pImpl->DestroyStream(aitt_stream);
 }
 
 }  // namespace aitt
index eaecaf4..821c339 100644 (file)
@@ -15,8 +15,7 @@
  */
 #include "AITTImpl.h"
 
-#include <flatbuffers/flexbuffers.h>
-
+#include <algorithm>
 #include <cerrno>
 #include <cstring>
 #include <functional>
@@ -111,7 +110,10 @@ void AITT::Impl::Connect(const std::string &host, int port, const std::string &u
 void AITT::Impl::Disconnect(void)
 {
     UnsubscribeAll();
-    DestroyAllStream();
+
+    for (auto stream : in_use_streams)
+        delete stream;
+    in_use_streams.clear();
 
     mqtt_broker_ip_.clear();
     mqtt_broker_port_ = -1;
@@ -143,26 +145,6 @@ void AITT::Impl::UnsubscribeAll()
     }
     subscribed_list.clear();
 }
-void AITT::Impl::DestroyAllStream(void)
-{
-    std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
-    for (auto tag : stream_list_) {
-        if (!tag)
-            continue;
-        //TODO call stream transport
-        delete tag;
-    }
-    stream_list_.clear();
-}
-
-void AITT::Impl::StreamInfoExist(StreamTag *tag)
-{
-    if (std::find(stream_list_.begin(), stream_list_.end(), tag) == stream_list_.end()) {
-        ERR("Unknown stream id(%p)", tag);
-        throw std::runtime_error("stream id");
-    }
-}
 
 void AITT::Impl::ConfigureTransportModule(const std::string &key, const std::string &value,
       AittProtocol protocols)
@@ -253,7 +235,7 @@ void *AITT::Impl::Unsubscribe(AittSubscribeID subscribe_id)
     auto it = std::find(subscribed_list.begin(), subscribed_list.end(), info);
     if (it == subscribed_list.end()) {
         ERR("Unknown subscribe_id(%p)", subscribe_id);
-        throw std::runtime_error("subscribe_id");
+        throw AittException(AittException::NO_DATA_ERR);
     }
 
     void *user_data = nullptr;
@@ -294,7 +276,7 @@ int AITT::Impl::PublishWithReply(const std::string &topic, const void *data, con
               if (sub_msg->IsEndSequence()) {
                   try {
                       Unsubscribe(sub_msg->GetID());
-                  } catch (std::runtime_error &e) {
+                  } catch (AittException &e) {
                       ERR("Unsubscribe() Fail(%s)", e.what());
                   }
               }
@@ -329,7 +311,7 @@ int AITT::Impl::PublishWithReplySync(const std::string &topic, const void *data,
               if (sub_msg->IsEndSequence()) {
                   try {
                       Unsubscribe(sub_msg->GetID());
-                  } catch (std::runtime_error &e) {
+                  } catch (AittException &e) {
                       ERR("Unsubscribe() Fail(%s)", e.what());
                   }
                   sync_loop.Quit();
@@ -406,153 +388,30 @@ void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic,
                 user_data, qos);
 }
 
-AittStreamID AITT::Impl::CreatePublishStream(const std::string &topic, AittProtocol protocol)
+AittStream *AITT::Impl::CreateStream(AittStreamProtocol type, const std::string &topic,
+      AittStreamRole role)
 {
-   //TODO call stream transport
-    auto info = new StreamTag(topic, protocol, AITT_STREAM_ROLE_SRC, nullptr);
-    // TODO: be prepared when there's memory failure
-    {
-        std::unique_lock<std::mutex> lock(stream_list_mutex_);
-        stream_list_.push_back(info);
+    AittStreamModule *stream = nullptr;
+    try {
+        stream = modules.NewStreamModule(type, topic, role);
+        in_use_streams.push_back(stream);
+    } catch (std::exception &e) {
+        ERR("StreamHandler() Fail(%s)", e.what());
     }
+    discovery.Restart();
 
-    INFO("Stream topic(%s) : %p", topic.c_str(), info);
-    return reinterpret_cast<AittStreamID>(info);
+    return stream;
 }
 
-AittStreamID AITT::Impl::CreateSubscribeStream(const std::string &topic,
-      AittProtocol protocol)
+void AITT::Impl::DestroyStream(AittStream *aitt_stream)
 {
-    //TODO call stream transport
-    auto info = new StreamTag(topic, protocol, AITT_STREAM_ROLE_SINK, nullptr);
-    // TODO: be prepared when there's memory failure
-    {
-        std::unique_lock<std::mutex> lock(stream_list_mutex_);
-        stream_list_.push_back(info);
+    auto it = std::find(in_use_streams.begin(), in_use_streams.end(), aitt_stream);
+    if (it == in_use_streams.end()) {
+        ERR("Unknown Stream(%p)", aitt_stream);
+        return;
     }
-
-    INFO("Stream topic(%s) : %p", topic.c_str(), info);
-    return reinterpret_cast<AittStreamID>(info);
-}
-
-void AITT::Impl::DestroyStream(AittStreamID handle)
-{
-    INFO("stream id : %p", handle);
-    StreamTag *info = reinterpret_cast<StreamTag *>(handle);
-
-    std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
-    auto it = std::find(stream_list_.begin(), stream_list_.end(), info);
-    if (it == stream_list_.end()) {
-        ERR("Unknown stream id(%p)", handle);
-        throw std::runtime_error("stream ID");
-    }
-
-    //TODO call stream transport
-
-    stream_list_.erase(it);
-    delete *it;
-}
-
-void AITT::Impl::SetStreamConfig(AittStreamID handle, const std::string &key,
-      const std::string &value)
-{
-    INFO("stream id : %p", handle);
-    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
-    std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
-    StreamInfoExist(tag);
-
-    //TODO call stream transport
-}
-
-std::string AITT::Impl::GetStreamConfig(AittStreamID handle, const std::string &key)
-{
-    std::string value;
-    INFO("stream id : %p", handle);
-    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
-    std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
-    StreamInfoExist(tag);
-
-    //TODO call stream transport
-
-    return value;
-}
-
-void AITT::Impl::StartStream(AittStreamID handle)
-{
-    INFO("stream id : %p", handle);
-    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
-    std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
-    StreamInfoExist(tag);
-
-    //TODO call stream transport
-}
-
-void AITT::Impl::StopStream(AittStreamID handle)
-{
-    INFO("stream id : %p", handle);
-    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
-    std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
-    StreamInfoExist(tag);
-
-    //TODO call stream transport
-}
-
-void AITT::Impl::SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb,
-      void *user_data)
-{
-    INFO("stream id : %p", handle);
-    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
-    std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
-    StreamInfoExist(tag);
-
-    //TODO call stream transport
-}
-
-void AITT::Impl::UnsetStreamStateCallback(AittStreamID handle)
-{
-    INFO("stream id : %p", handle);
-    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
-    std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
-    StreamInfoExist(tag);
-
-    //TODO call stream transport
-}
-
-void AITT::Impl::SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb, void *user_data)
-{
-    INFO("stream id : %p", handle);
-    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
-    std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
-    StreamInfoExist(tag);
-
-    //TODO call stream transport
-}
-
-void AITT::Impl::UnsetStreamSinkCallback(AittStreamID handle)
-{
-    INFO("stream id : %p", handle);
-    StreamTag *tag = reinterpret_cast<StreamTag *>(handle);
-
-    std::unique_lock<std::mutex> lock(stream_list_mutex_);
-
-    StreamInfoExist(tag);
-
-    //TODO call stream transport
+    in_use_streams.erase(it);
+    delete aitt_stream;
 }
 
 }  // namespace aitt
index 097e749..7279687 100644 (file)
@@ -15,8 +15,6 @@
  */
 #pragma once
 
-#include <flatbuffers/flexbuffers.h>
-
 #include <map>
 #include <memory>
 #include <mutex>
 
 #include "AITT.h"
 #include "AittDiscovery.h"
-#include "AittStreamTag.h"
+#include "AittStream.h"
 #include "MQ.h"
 #include "MainLoopHandler.h"
 #include "ModuleManager.h"
 
 namespace aitt {
-
 class AITT::Impl {
   public:
     explicit Impl(AITT &parent, const std::string &id, const std::string &my_ip,
@@ -64,17 +61,9 @@ class AITT::Impl {
 
     void SendReply(MSG *msg, const void *data, const int datalen, bool end);
 
-    AittStreamID CreatePublishStream(const std::string &topic, AittProtocol protocol);
-    AittStreamID CreateSubscribeStream(const std::string &topic, AittProtocol protocol);
-    void DestroyStream(AittStreamID handle);
-    void SetStreamConfig(AittStreamID handle, const std::string &key, const std::string &value);
-    std::string GetStreamConfig(AittStreamID handle, const std::string &key);
-    void StartStream(AittStreamID handle);
-    void StopStream(AittStreamID handle);
-    void SetStreamStateCallback(AittStreamID handle, StreamStateCallback cb, void *user_data);
-    void UnsetStreamStateCallback(AittStreamID handle);
-    void SetStreamSinkCallback(AittStreamID handle, StreamSinkCallback cb, void *user_data);
-    void UnsetStreamSinkCallback(AittStreamID handle);
+    AittStream *CreateStream(AittStreamProtocol type, const std::string &topic,
+          AittStreamRole role);
+    void DestroyStream(AittStream *aitt_stream);
 
   private:
     using Blob = std::pair<const void *, int>;
@@ -89,15 +78,9 @@ class AITT::Impl {
     void *SubscribeTCP(SubscribeInfo *, const std::string &topic, const SubscribeCallback &cb,
           void *cbdata, AittQoS qos);
 
-    void *SubscribeWebRtc(SubscribeInfo *, const std::string &topic, const SubscribeCallback &cb,
-          void *cbdata, AittQoS qos);
     void HandleTimeout(int timeout_ms, unsigned int &timeout_id, aitt::MainLoopHandler &sync_loop,
           bool &is_timeout);
-    void PublishWebRtc(const std::string &topic, const void *data, const size_t datalen,
-          AittQoS qos, bool retain);
     void UnsubscribeAll();
-    void DestroyAllStream(void);
-    void StreamInfoExist(StreamTag *tag);
     void ThreadMain(void);
 
     AITT &public_api;
@@ -108,8 +91,7 @@ class AITT::Impl {
     std::unique_ptr<MQ> mq;
     std::vector<SubscribeInfo *> subscribed_list;
     std::mutex subscribed_list_mutex_;
-    std::vector<StreamTag *> stream_list_;
-    std::mutex stream_list_mutex_;
+    std::vector<AittStreamModule *> in_use_streams;
 
     std::string id_;
     std::string mqtt_broker_ip_;
index 6950c9a..00acedb 100644 (file)
@@ -30,6 +30,10 @@ ModuleManager::ModuleManager(const std::string &my_ip, AittDiscovery &d)
         transport_handles.push_back(ModuleHandle(nullptr, nullptr));
         LoadTransport(static_cast<TransportType>(i));
     }
+
+    for (int i = AITT_STREAM_TYPE_WEBRTC; i < AITT_STREAM_TYPE_MAX; ++i) {
+        stream_handles.push_back(ModuleHandle(nullptr, nullptr));
+    }
 }
 
 AittTransport &ModuleManager::Get(AittProtocol protocol)
@@ -49,8 +53,6 @@ ModuleManager::TransportType ModuleManager::Convert(AittProtocol type)
         return TYPE_TCP;
     case AITT_TYPE_TCP_SECURE:
         return TYPE_TCP_SECURE;
-    case AITT_TYPE_WEBRTC:
-        return TYPE_WEBRTC;
 
     case AITT_TYPE_MQTT:
     default:
@@ -60,20 +62,33 @@ ModuleManager::TransportType ModuleManager::Convert(AittProtocol type)
     return TYPE_TRANSPORT_MAX;
 }
 
-std::string ModuleManager::GetTransportFileName(TransportType type)
+const char *ModuleManager::GetTransportFileName(TransportType type)
 {
     switch (type) {
     case TYPE_TCP:
     case TYPE_TCP_SECURE:
         return "libaitt-transport-tcp.so";
-    case TYPE_WEBRTC:
-        return "libaitt-transport-webrtc.so";
     default:
         ERR("Unknown Type(%d)", type);
         break;
     }
 
-    return std::string("Unknown");
+    return "Unknown";
+}
+
+const char *ModuleManager::GetStreamFileName(AittStreamProtocol type)
+{
+    switch (type) {
+    case AITT_STREAM_TYPE_WEBRTC:
+        return "libaitt-stream-webrtc.so";
+    case AITT_STREAM_TYPE_RTSP:
+        return "libaitt-stream-rtsp.so";
+    default:
+        ERR("Unknown Type(%d)", type);
+        break;
+    }
+
+    return "Unknown";
 }
 
 ModuleManager::ModuleHandle ModuleManager::OpenModule(const char *file)
@@ -82,8 +97,10 @@ ModuleManager::ModuleHandle ModuleManager::OpenModule(const char *file)
         if (dlclose(handle))
             ERR("dlclose: %s", dlerror());
     });
-    if (handle == nullptr)
+    if (handle == nullptr) {
         ERR("dlopen(%s): %s", file, dlerror());
+        throw AittException(AittException::SYSTEM_ERR);
+    }
 
     return handle;
 }
@@ -93,17 +110,17 @@ ModuleManager::ModuleHandle ModuleManager::OpenTransport(TransportType type)
     if (TYPE_TCP_SECURE == type)
         type = TYPE_TCP;
 
-    std::string filename = GetTransportFileName(type);
-    ModuleHandle handle = OpenModule(filename.c_str());
+    ModuleHandle handle = OpenModule(GetTransportFileName(type));
 
     return handle;
 }
 
 void ModuleManager::LoadTransport(TransportType type)
 {
-    transport_handles[type] = OpenTransport(type);
-    if (transport_handles[type] == nullptr) {
-        ERR("OpenTransport(%d) Fail", type);
+    try {
+        transport_handles[type] = OpenTransport(type);
+    } catch (AittException &e) {
+        ERR("OpenTransport(%d) Fail(%s)", type, e.what());
         return;
     }
 
@@ -122,9 +139,33 @@ void ModuleManager::LoadTransport(TransportType type)
     }
 }
 
+AittStreamModule *ModuleManager::NewStreamModule(AittStreamProtocol type, const std::string &topic,
+      AittStreamRole role)
+{
+    if (nullptr == stream_handles[type])
+        stream_handles[type] = OpenModule(GetStreamFileName(type));
+
+    AittStreamModule::ModuleEntry get_instance_fn = reinterpret_cast<AittStreamModule::ModuleEntry>(
+          dlsym(stream_handles[type].get(), AittStreamModule::MODULE_ENTRY_NAME));
+    if (get_instance_fn == nullptr) {
+        ERR("dlsym: %s", dlerror());
+        throw AittException(AittException::SYSTEM_ERR);
+    }
+
+    AittStreamModule *instance(
+          static_cast<AittStreamModule *>(get_instance_fn(discovery, topic, role)));
+    if (instance == nullptr) {
+        ERR("get_instance_fn(AittStreamModule) Fail");
+        throw AittException(AittException::SYSTEM_ERR);
+    }
+
+    return instance;
+}
+
 std::unique_ptr<MQ> ModuleManager::NewCustomMQ(const std::string &id, const AittOption &option)
 {
-    custom_mqtt_handle = OpenModule("libaitt-st-broker.so");
+    if (nullptr == custom_mqtt_handle)
+        custom_mqtt_handle = OpenModule("libaitt-st-broker.so");
 
     MQ::ModuleEntry get_instance_fn =
           reinterpret_cast<MQ::ModuleEntry>(dlsym(custom_mqtt_handle.get(), MQ::MODULE_ENTRY_NAME));
index d71872d..fbbd2a2 100644 (file)
@@ -20,6 +20,7 @@
 #include <vector>
 
 #include "AittDiscovery.h"
+#include "AittStreamModule.h"
 #include "AittTransport.h"
 #include "MQ.h"
 #include "NullTransport.h"
@@ -32,6 +33,8 @@ class ModuleManager {
     virtual ~ModuleManager() = default;
 
     AittTransport &Get(AittProtocol type);
+    AittStreamModule *NewStreamModule(AittStreamProtocol type, const std::string &topic,
+          AittStreamRole role);
     std::unique_ptr<MQ> NewCustomMQ(const std::string &id, const AittOption &option);
 
   private:
@@ -41,12 +44,12 @@ class ModuleManager {
     enum TransportType {
         TYPE_TCP,         //(0x1 << 1)
         TYPE_TCP_SECURE,  //(0x1 << 2)
-        TYPE_WEBRTC,      //(0x1 << 3)
         TYPE_TRANSPORT_MAX,
     };
 
     TransportType Convert(AittProtocol type);
-    std::string GetTransportFileName(TransportType type);
+    const char *GetTransportFileName(TransportType type);
+    const char *GetStreamFileName(AittStreamProtocol type);
     ModuleHandle OpenModule(const char *file);
     ModuleHandle OpenTransport(TransportType type);
     void LoadTransport(TransportType type);
@@ -55,6 +58,7 @@ class ModuleManager {
     AittDiscovery &discovery;
     std::vector<ModuleHandle> transport_handles;
     std::unique_ptr<AittTransport> transports[TYPE_TRANSPORT_MAX];
+    std::vector<ModuleHandle> stream_handles;
     ModuleHandle custom_mqtt_handle;
     NullTransport null_transport;
 };
diff --git a/tests/AITT_Stream_test.cc b/tests/AITT_Stream_test.cc
deleted file mode 100644 (file)
index aa33cc3..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <gtest/gtest.h>
-#include <sys/random.h>
-
-#include <thread>
-
-#include "AITT.h"
-#include "AITT_Stream_tests.h"
-
-using AITT = aitt::AITT;
-
-class AITTStreamTest : public testing::Test, public AittStreamTests {
-  protected:
-    void SetUp() override { Init(); }
-    void TearDown() override { Deinit(); }
-};
-
-TEST_F(AITTStreamTest, Positive_Create_Pubilsh_Stream_WebRTC_Anytime)
-{
-    try {
-        AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
-        aitt.Connect();
-        publish_stream_handle_ =
-              aitt.CreatePublishStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
-        EXPECT_NE(publish_stream_handle_, nullptr);
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception: " << e.what();
-    }
-}
-
-TEST_F(AITTStreamTest, Positive_Create_Subscribe_Stream_WebRTC_Anytime)
-{
-    try {
-        AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
-        aitt.Connect();
-        subscribe_stream_handle_ =
-              aitt.CreateSubscribeStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
-        EXPECT_NE(subscribe_stream_handle_, nullptr);
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception: " << e.what();
-    }
-}
-
-TEST_F(AITTStreamTest, Positive_Destroy_Pubilsh_Stream_WebRTC_Anytime)
-{
-    try {
-        AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
-        aitt.Connect();
-        publish_stream_handle_ =
-              aitt.CreatePublishStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
-        aitt.DestroyStream(publish_stream_handle_);
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception: " << e.what();
-    }
-}
-
-TEST_F(AITTStreamTest, Positive_Destroy_Subscribe_Stream_WebRTC_Anytime)
-{
-    try {
-        AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
-        aitt.Connect();
-        subscribe_stream_handle_ =
-              aitt.CreateSubscribeStream(TEST_STREAM_TOPIC, AITT_TYPE_WEBRTC);
-        aitt.DestroyStream(subscribe_stream_handle_);
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception: " << e.what();
-    }
-}
-
-TEST_F(AITTStreamTest, Negative_Destroy_Stream_Anytime)
-{
-    EXPECT_THROW(
-          {
-              try {
-                  AITT aitt(stream_src_id_, LOCAL_IP, AittOption(true, false));
-                  aitt.Connect();
-                  aitt.DestroyStream(nullptr);
-              } catch (const std::runtime_error &e) {
-                  // and this tests that it has the correct message
-                  throw;
-              }
-          },
-          std::runtime_error);
-}
diff --git a/tests/AITT_Stream_tests.h b/tests/AITT_Stream_tests.h
deleted file mode 100644 (file)
index 7fc1b7a..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <glib.h>
-#include <sys/time.h>
-
-#include "aitt_internal.h"
-
-#define LOCAL_IP "127.0.0.1"
-#define TEST_STREAM_TOPIC "test/stream_topic"
-#define TEST_CONFIG_KEY "test_config_key"
-#define TEST_CONFIG_VALUE "test_config_value"
-
-class AittStreamTests {
-  public:
-    void Init()
-    {
-        timeval tv;
-        char buffer[256];
-        gettimeofday(&tv, nullptr);
-        snprintf(buffer, sizeof(buffer), "UniqueStreamSrcID.%lX%lX", tv.tv_sec, tv.tv_usec);
-        stream_src_id_ = buffer;
-        snprintf(buffer, sizeof(buffer), "UniqueStreamSinkID.%lX%lX", tv.tv_sec, tv.tv_usec);
-        stream_sink_id_ = buffer;
-        stream_topic_ = "StreamTopic";
-        mainLoop_ = g_main_loop_new(nullptr, FALSE);
-    }
-
-    void Deinit() { g_main_loop_unref(mainLoop_); }
-
-    void IterateEventLoop(void)
-    {
-        g_main_loop_run(mainLoop_);
-        DBG("Go forward");
-    }
-
-    AittStreamID publish_stream_handle_;
-    AittStreamID subscribe_stream_handle_;
-
-    std::string stream_src_id_;
-    std::string stream_sink_id_;
-    std::string stream_topic_;
-    GMainLoop *mainLoop_;
-};
diff --git a/tests/AittStream_test.cc b/tests/AittStream_test.cc
new file mode 100644 (file)
index 0000000..e9a22cb
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AittStream.h"
+
+#include <gtest/gtest.h>
+
+#include "AITT.h"
+#include "AittTests.h"
+
+using namespace aitt;
+
+TEST(AittStreamTest, Full_P)
+{
+    try {
+        AITT aitt("streamClientId", LOCAL_IP, AittOption(true, false));
+
+        aitt.Connect();
+
+        AittStream *publisher =
+              aitt.CreateStream(AITT_STREAM_TYPE_WEBRTC, "topic", AITT_STREAM_ROLE_PUBLISHER);
+        ASSERT_TRUE(publisher) << "CreateStream() Fail";
+
+        AittStream *subscriber =
+              aitt.CreateStream(AITT_STREAM_TYPE_WEBRTC, "topic", AITT_STREAM_ROLE_SUBSCRIBER);
+        ASSERT_TRUE(subscriber) << "CreateStream() Fail";
+
+        publisher->SetConfig("key", "value");
+        publisher->Start();
+
+        subscriber->SetConfig("key", "value");
+        subscriber->SetStateCallback([](AittStream *stream, int state, void *user_data) {},
+              (void *)"user_data");
+        subscriber->SetReceiveCallback([](AittStream *stream, void *obj, void *user_data) {},
+              (void *)"user-data");
+        subscriber->Start();
+
+        aitt.DestroyStream(publisher);
+        aitt.DestroyStream(subscriber);
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}
index 1febe7d..4c8dab7 100644 (file)
@@ -7,7 +7,8 @@ INCLUDE_DIRECTORIES(${UT_NEEDS_INCLUDE_DIRS} ../src)
 LINK_DIRECTORIES(${UT_NEEDS_LIBRARY_DIRS})
 
 ###########################################################################
-SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc AITT_TCP_test.cc)
+SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc
+    AITT_TCP_test.cc AittStream_test.cc)
 ADD_EXECUTABLE(${AITT_UT} ${AITT_UT_SRC})
 TARGET_LINK_LIBRARIES(${AITT_UT} Threads::Threads ${UT_NEEDS_LIBRARIES} ${PROJECT_NAME})
 
@@ -59,17 +60,3 @@ ADD_TEST(
         LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH}
         ${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT}_local --gtest_filter=*_Anytime
 )
-###########################################################################
-ADD_EXECUTABLE(${AITT_UT}_stream AITT_Stream_test.cc)
-TARGET_LINK_LIBRARIES(${AITT_UT}_stream Threads::Threads ${UT_NEEDS_LIBRARIES} ${PROJECT_NAME})
-
-INSTALL(TARGETS ${AITT_UT}_stream DESTINATION ${AITT_TEST_BINDIR})
-
-ADD_TEST(
-    NAME
-        ${AITT_UT}_stream
-    COMMAND
-        ${CMAKE_COMMAND} -E env
-        LD_LIBRARY_PATH=../modules/tcp/:../modules/webrtc/:../:../common/:$ENV{LD_LIBRARY_PATH}
-        ${CMAKE_CURRENT_BINARY_DIR}/${AITT_UT}_stream --gtest_filter=*_Anytime
-)