Allocate only one stream for an aitt handle
authorJiung Yu <jiung.yu@samsung.com>
Thu, 17 Nov 2022 07:45:00 +0000 (16:45 +0900)
committerJiung Yu <jiung.yu@samsung.com>
Mon, 12 Dec 2022 09:18:07 +0000 (18:18 +0900)
modules/webrtc/SinkStreamManager.cc
modules/webrtc/SrcStreamManager.cc
modules/webrtc/StreamManager.cc
modules/webrtc/StreamManager.h
modules/webrtc/WebRtcMessage.cc

index f7a9258..389b9a7 100644 (file)
@@ -30,9 +30,6 @@ SinkStreamManager::SinkStreamManager(const std::string &topic, const std::string
 
 SinkStreamManager::~SinkStreamManager()
 {
-    for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
-        itr->second->Destroy();
-    streams_.clear();
 }
 
 void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb)
@@ -48,7 +45,8 @@ void SinkStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
     stream.GetEventHandler().SetOnIceCandidateCb(
           std::bind(&SinkStreamManager::OnIceCandidate, this));
 
-    stream.GetEventHandler().SetOnEncodedFrameCb(std::bind(&SinkStreamManager::OnEncodedFrame, this));
+    stream.GetEventHandler().SetOnEncodedFrameCb(
+          std::bind(&SinkStreamManager::OnEncodedFrame, this));
 }
 
 void SinkStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream)
@@ -90,15 +88,14 @@ void SinkStreamManager::HandleStreamState(const std::string &discovery_id,
     else if (src_state.compare("STOP") == 0)
         HandleRemovedClient(discovery_id);
     else
-        DBG("Invalid message %s", src_state);
+        ERR("Invalid message %s", src_state);
 }
 
 void SinkStreamManager::HandleStartStream(const std::string &discovery_id)
 {
     DBG("%s", __func__);
-    auto src_stream = streams_.find(discovery_id);
-    if (src_stream != streams_.end()) {
-        DBG("There's stream already");
+    if (!peer_aitt_id_.empty()) {
+        ERR("There's stream already");
         return;
     }
 
@@ -108,62 +105,58 @@ void SinkStreamManager::HandleStartStream(const std::string &discovery_id)
 
 void SinkStreamManager::AddStream(const std::string &discovery_id)
 {
-    auto stream = new WebRtcStream();
-    SetWebRtcStreamCallbacks(*stream);
-    stream->Create(false, false);
-    stream->Start();
+    SetWebRtcStreamCallbacks(stream_);
+    stream_.Create(false, false);
+    stream_.Start();
 
     std::stringstream s_stream;
-    s_stream << static_cast<void *>(stream);
+    s_stream << static_cast<void *>(&stream_);
 
-    stream->SetStreamId(std::string(thread_id_ + s_stream.str()));
-    streams_[discovery_id] = stream;
+    stream_.SetStreamId(std::string(thread_id_ + s_stream.str()));
+    peer_aitt_id_ = discovery_id;
 }
 
 void SinkStreamManager::HandleStreamInfo(const std::string &discovery_id,
       const std::vector<uint8_t> &message)
 {
     if (!WebRtcMessage::IsValidStreamInfo(message)) {
-        DBG("Invalid streams info");
+        ERR("Invalid streams info");
         return;
     }
 
-    // sink_streams have a stream at normal situation
-    auto src_streams = flexbuffers::GetRoot(message).AsVector();
-    for (size_t stream_idx = 0; stream_idx < src_streams.size(); ++stream_idx) {
-        auto stream = src_streams[stream_idx].AsMap();
-        auto id = stream["id"].AsString().str();
-        auto peer_id = stream["peer_id"].AsString().str();
-        auto sdp = stream["sdp"].AsString().str();
-        std::vector<std::string> ice_candidates;
-        auto ice_info = stream["ice_candidates"].AsVector();
-        for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
-            ice_candidates.push_back(ice_info[ice_idx].AsString().str());
-        UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
-    }
+    // have a stream at Current status
+    auto stream = flexbuffers::GetRoot(message).AsMap();
+    auto id = stream["id"].AsString().str();
+    auto peer_id = stream["peer_id"].AsString().str();
+    auto sdp = stream["sdp"].AsString().str();
+    std::vector<std::string> ice_candidates;
+    auto ice_info = stream["ice_candidates"].AsVector();
+    for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
+        ice_candidates.push_back(ice_info[ice_idx].AsString().str());
+    UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
 }
 
 void SinkStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
       const std::string &peer_id, const std::string &sdp,
       const std::vector<std::string> &ice_candidates)
 {
-    auto src_stream = streams_.find(discovery_id);
-    if (src_stream == streams_.end()) {
-        DBG("No matching stream");
+    //There's only one stream for a aitt ID
+    if (peer_aitt_id_ != discovery_id) {
+        ERR("No matching stream");
         return;
     }
 
-    if (peer_id.compare(src_stream->second->GetStreamId()) != 0) {
-        DBG("Different ID");
+    if (peer_id.compare(stream_.GetStreamId()) != 0) {
+        ERR("Different ID");
         return;
     }
 
-    if (src_stream->second->GetPeerId().size() == 0) {
+    if (stream_.GetPeerId().size() == 0) {
         DBG("first update");
-        src_stream->second->SetPeerId(id);
-        src_stream->second->AddPeerInformation(sdp, ice_candidates);
+        stream_.SetPeerId(id);
+        stream_.AddPeerInformation(sdp, ice_candidates);
     } else
-        src_stream->second->UpdatePeerInformation(ice_candidates);
+        stream_.UpdatePeerInformation(ice_candidates);
 }
 
 std::vector<uint8_t> SinkStreamManager::GetDiscoveryMessage(void)
@@ -171,19 +164,15 @@ std::vector<uint8_t> SinkStreamManager::GetDiscoveryMessage(void)
     std::vector<uint8_t> message;
 
     flexbuffers::Builder fbb;
-    fbb.Vector([&] {
-        for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) {
-            fbb.Map([&] {
-                fbb.String("id", itr->second->GetStreamId());
-                fbb.String("peer_id", itr->second->GetPeerId());
-                fbb.String("sdp", itr->second->GetLocalDescription());
-                fbb.Vector("ice_candidates", [&]() {
-                    for (const auto &candidate : itr->second->GetIceCandidates()) {
-                        fbb.String(candidate);
-                    }
-                });
-            });
-        }
+    fbb.Map([&] {
+        fbb.String("id", stream_.GetStreamId());
+        fbb.String("peer_id", stream_.GetPeerId());
+        fbb.String("sdp", stream_.GetLocalDescription());
+        fbb.Vector("ice_candidates", [&]() {
+            for (const auto &candidate : stream_.GetIceCandidates()) {
+                fbb.String(candidate);
+            }
+        });
     });
     fbb.Finish();
 
index 574d9c7..ab77402 100644 (file)
@@ -32,10 +32,6 @@ SrcStreamManager::SrcStreamManager(const std::string &topic, const std::string &
 
 SrcStreamManager::~SrcStreamManager()
 {
-    // TODO: You should take care about stream resource
-    for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
-        itr->second->Destroy();
-    streams_.clear();
 }
 
 void SrcStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
@@ -87,7 +83,7 @@ void SrcStreamManager::HandleStreamState(const std::string &discovery_id,
     if (sink_state.compare("STOP") == 0)
         HandleRemovedClient(discovery_id);
     else
-        DBG("Invalid message %s", sink_state);
+        ERR("Invalid message %s", sink_state);
 }
 
 void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id,
@@ -95,53 +91,50 @@ void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id,
 {
     DBG("%s", __func__);
     if (!WebRtcMessage::IsValidStreamInfo(message)) {
-        DBG("Invalid streams info");
+        ERR("Invalid streams info");
         return;
     }
 
-    // sink_streams have a stream at normal situation
-    auto sink_streams = flexbuffers::GetRoot(message).AsVector();
-    for (size_t stream_idx = 0; stream_idx < sink_streams.size(); ++stream_idx) {
-        auto stream = sink_streams[stream_idx].AsMap();
-        auto id = stream["id"].AsString().str();
-        auto peer_id = stream["peer_id"].AsString().str();
-        auto sdp = stream["sdp"].AsString().str();
-        std::vector<std::string> ice_candidates;
-        auto ice_info = stream["ice_candidates"].AsVector();
-        for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
-            ice_candidates.push_back(ice_info[ice_idx].AsString().str());
-        UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
-    }
+    // have a stream at Current status
+    auto stream = flexbuffers::GetRoot(message).AsMap();
+    auto id = stream["id"].AsString().str();
+    auto peer_id = stream["peer_id"].AsString().str();
+    auto sdp = stream["sdp"].AsString().str();
+    std::vector<std::string> ice_candidates;
+    auto ice_info = stream["ice_candidates"].AsVector();
+    for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
+        ice_candidates.push_back(ice_info[ice_idx].AsString().str());
+    UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
 }
 
 void SrcStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
       const std::string &peer_id, const std::string &sdp,
       const std::vector<std::string> &ice_candidates)
 {
-    auto sink_stream = streams_.find(discovery_id);
-    if (sink_stream == streams_.end())
+    // There's only one stream for a aitt ID
+    if (peer_aitt_id_.empty())
         AddStream(discovery_id, id, sdp, ice_candidates);
+    else if (peer_aitt_id_ == discovery_id && peer_id.compare(stream_.GetStreamId()) != 0)
+        stream_.UpdatePeerInformation(ice_candidates);
     else
-        sink_stream->second->UpdatePeerInformation(ice_candidates);
+        ERR("Invalid peer ID");
 }
 
 void SrcStreamManager::AddStream(const std::string &discovery_id, const std::string &id,
       const std::string &sdp, const std::vector<std::string> &ice_candidates)
 {
-    auto stream = new WebRtcStream();
-    SetWebRtcStreamCallbacks(*stream);
-    stream->Create(true, false);
-    stream->AttachCameraSource();
-    stream->Start();
+    SetWebRtcStreamCallbacks(stream_);
+    stream_.Create(true, false);
+    stream_.AttachCameraSource();
+    stream_.Start();
 
     std::stringstream s_stream;
-    s_stream << static_cast<void *>(stream);
-
-    stream->SetStreamId(std::string(thread_id_ + s_stream.str()));
-    stream->SetPeerId(id);
-    stream->AddPeerInformation(sdp, ice_candidates);
+    s_stream << static_cast<void *>(&stream_);
 
-    streams_[discovery_id] = stream;
+    stream_.SetStreamId(std::string(thread_id_ + s_stream.str()));
+    stream_.SetPeerId(id);
+    peer_aitt_id_ = discovery_id;
+    stream_.AddPeerInformation(sdp, ice_candidates);
 
     return;
 }
@@ -151,19 +144,15 @@ std::vector<uint8_t> SrcStreamManager::GetDiscoveryMessage(void)
     std::vector<uint8_t> message;
 
     flexbuffers::Builder fbb;
-    fbb.Vector([&] {
-        for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) {
-            fbb.Map([&] {
-                fbb.String("id", itr->second->GetStreamId());
-                fbb.String("peer_id", itr->second->GetPeerId());
-                fbb.String("sdp", itr->second->GetLocalDescription());
-                fbb.Vector("ice_candidates", [&]() {
-                    for (const auto &candidate : itr->second->GetIceCandidates()) {
-                        fbb.String(candidate);
-                    }
-                });
-            });
-        }
+    fbb.Map([&] {
+        fbb.String("id", stream_.GetStreamId());
+        fbb.String("peer_id", stream_.GetPeerId());
+        fbb.String("sdp", stream_.GetLocalDescription());
+        fbb.Vector("ice_candidates", [&]() {
+            for (const auto &candidate : stream_.GetIceCandidates()) {
+                fbb.String(candidate);
+            }
+        });
     });
     fbb.Finish();
 
index 3e81a9d..a4bded0 100644 (file)
@@ -36,10 +36,8 @@ void StreamManager::Start(void)
 void StreamManager::Stop(void)
 {
     DBG("%s %s", __func__, GetTopic().c_str());
-    // TODO: You should take care about stream resource
-    for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
-        itr->second->Destroy();
-    streams_.clear();
+    peer_aitt_id_.clear();
+    stream_.Destroy();
 
     if (stream_stop_cb_)
         stream_stop_cb_();
@@ -47,15 +45,12 @@ void StreamManager::Stop(void)
 
 void StreamManager::HandleRemovedClient(const std::string &discovery_id)
 {
-    auto stream_itr = streams_.find(discovery_id);
-    if (stream_itr == streams_.end()) {
-        DBG("There's no stream %s", discovery_id.c_str());
+    if (peer_aitt_id_ != discovery_id){
+        ERR("There's no stream %s", discovery_id.c_str());
         return;
     }
 
-    // TODO: You should take care about stream resource
-    stream_itr->second->Destroy();
-    streams_.erase(stream_itr);
+    stream_.Destroy();
 
     return;
 }
index 98677b7..70cfd69 100644 (file)
@@ -22,6 +22,7 @@
 #include <string>
 #include <thread>
 #include <vector>
+#include <memory>
 
 namespace AittWebRTCNamespace {
 
@@ -51,9 +52,9 @@ class StreamManager {
     // TODO: why dont' we remove below
     std::string aitt_id_;
     std::string thread_id_;
-    // TODO: What if user copies the module?
-    // Think about that case with destructor
-    std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> streams_;
+    //We assume Module class can't be copyable
+    std::string peer_aitt_id_;
+    WebRtcStream stream_;
     StreamStartCallback stream_start_cb_;
     StreamStopCallback stream_stop_cb_;
     IceCandidateAddedCallback ice_candidate_added_cb_;
index 422bd55..62f3162 100644 (file)
@@ -56,17 +56,12 @@ WebRtcMessage::Type WebRtcMessage::getMessageType(const std::string &message)
 
 bool WebRtcMessage::IsValidStreamInfo(const std::vector<uint8_t> &message)
 {
-    if (!flexbuffers::GetRoot(message).IsVector())
+    if (!flexbuffers::GetRoot(message).IsMap())
         return false;
 
-    auto streams = flexbuffers::GetRoot(message).AsVector();
-    for (size_t idx = 0; idx < streams.size(); ++idx) {
-        auto stream = streams[idx].AsMap();
-        if (!IsValidStream(stream))
-            return false;
-    }
+    auto stream_info = flexbuffers::GetRoot(message).AsMap();
 
-    return true;
+    return IsValidStream(stream_info);
 }
 
 bool WebRtcMessage::IsValidStream(const flexbuffers::Map &info)