From 0ca2b1559e2108cf807a3e295b48c48ef2c48573 Mon Sep 17 00:00:00 2001 From: Jiung Yu Date: Thu, 17 Nov 2022 16:45:00 +0900 Subject: [PATCH] Allocate only one stream for an aitt handle --- modules/webrtc/SinkStreamManager.cc | 91 ++++++++++++++++--------------------- modules/webrtc/SrcStreamManager.cc | 81 ++++++++++++++------------------- modules/webrtc/StreamManager.cc | 15 ++---- modules/webrtc/StreamManager.h | 7 +-- modules/webrtc/WebRtcMessage.cc | 11 ++--- 5 files changed, 87 insertions(+), 118 deletions(-) diff --git a/modules/webrtc/SinkStreamManager.cc b/modules/webrtc/SinkStreamManager.cc index f7a9258..389b9a7 100644 --- a/modules/webrtc/SinkStreamManager.cc +++ b/modules/webrtc/SinkStreamManager.cc @@ -30,9 +30,6 @@ SinkStreamManager::SinkStreamManager(const std::string &topic, const std::string SinkStreamManager::~SinkStreamManager() { - for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) - itr->second->Destroy(); - streams_.clear(); } void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb) @@ -48,7 +45,8 @@ void SinkStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream) stream.GetEventHandler().SetOnIceCandidateCb( std::bind(&SinkStreamManager::OnIceCandidate, this)); - stream.GetEventHandler().SetOnEncodedFrameCb(std::bind(&SinkStreamManager::OnEncodedFrame, this)); + stream.GetEventHandler().SetOnEncodedFrameCb( + std::bind(&SinkStreamManager::OnEncodedFrame, this)); } void SinkStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream) @@ -90,15 +88,14 @@ void SinkStreamManager::HandleStreamState(const std::string &discovery_id, else if (src_state.compare("STOP") == 0) HandleRemovedClient(discovery_id); else - DBG("Invalid message %s", src_state); + ERR("Invalid message %s", src_state); } void SinkStreamManager::HandleStartStream(const std::string &discovery_id) { DBG("%s", __func__); - auto src_stream = streams_.find(discovery_id); - if (src_stream != streams_.end()) { - DBG("There's stream already"); + if (!peer_aitt_id_.empty()) { + ERR("There's stream already"); return; } @@ -108,62 +105,58 @@ void SinkStreamManager::HandleStartStream(const std::string &discovery_id) void SinkStreamManager::AddStream(const std::string &discovery_id) { - auto stream = new WebRtcStream(); - SetWebRtcStreamCallbacks(*stream); - stream->Create(false, false); - stream->Start(); + SetWebRtcStreamCallbacks(stream_); + stream_.Create(false, false); + stream_.Start(); std::stringstream s_stream; - s_stream << static_cast(stream); + s_stream << static_cast(&stream_); - stream->SetStreamId(std::string(thread_id_ + s_stream.str())); - streams_[discovery_id] = stream; + stream_.SetStreamId(std::string(thread_id_ + s_stream.str())); + peer_aitt_id_ = discovery_id; } void SinkStreamManager::HandleStreamInfo(const std::string &discovery_id, const std::vector &message) { if (!WebRtcMessage::IsValidStreamInfo(message)) { - DBG("Invalid streams info"); + ERR("Invalid streams info"); return; } - // sink_streams have a stream at normal situation - auto src_streams = flexbuffers::GetRoot(message).AsVector(); - for (size_t stream_idx = 0; stream_idx < src_streams.size(); ++stream_idx) { - auto stream = src_streams[stream_idx].AsMap(); - auto id = stream["id"].AsString().str(); - auto peer_id = stream["peer_id"].AsString().str(); - auto sdp = stream["sdp"].AsString().str(); - std::vector ice_candidates; - auto ice_info = stream["ice_candidates"].AsVector(); - for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx) - ice_candidates.push_back(ice_info[ice_idx].AsString().str()); - UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates); - } + // have a stream at Current status + auto stream = flexbuffers::GetRoot(message).AsMap(); + auto id = stream["id"].AsString().str(); + auto peer_id = stream["peer_id"].AsString().str(); + auto sdp = stream["sdp"].AsString().str(); + std::vector ice_candidates; + auto ice_info = stream["ice_candidates"].AsVector(); + for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx) + ice_candidates.push_back(ice_info[ice_idx].AsString().str()); + UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates); } void SinkStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id, const std::string &peer_id, const std::string &sdp, const std::vector &ice_candidates) { - auto src_stream = streams_.find(discovery_id); - if (src_stream == streams_.end()) { - DBG("No matching stream"); + //There's only one stream for a aitt ID + if (peer_aitt_id_ != discovery_id) { + ERR("No matching stream"); return; } - if (peer_id.compare(src_stream->second->GetStreamId()) != 0) { - DBG("Different ID"); + if (peer_id.compare(stream_.GetStreamId()) != 0) { + ERR("Different ID"); return; } - if (src_stream->second->GetPeerId().size() == 0) { + if (stream_.GetPeerId().size() == 0) { DBG("first update"); - src_stream->second->SetPeerId(id); - src_stream->second->AddPeerInformation(sdp, ice_candidates); + stream_.SetPeerId(id); + stream_.AddPeerInformation(sdp, ice_candidates); } else - src_stream->second->UpdatePeerInformation(ice_candidates); + stream_.UpdatePeerInformation(ice_candidates); } std::vector SinkStreamManager::GetDiscoveryMessage(void) @@ -171,19 +164,15 @@ std::vector SinkStreamManager::GetDiscoveryMessage(void) std::vector message; flexbuffers::Builder fbb; - fbb.Vector([&] { - for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) { - fbb.Map([&] { - fbb.String("id", itr->second->GetStreamId()); - fbb.String("peer_id", itr->second->GetPeerId()); - fbb.String("sdp", itr->second->GetLocalDescription()); - fbb.Vector("ice_candidates", [&]() { - for (const auto &candidate : itr->second->GetIceCandidates()) { - fbb.String(candidate); - } - }); - }); - } + fbb.Map([&] { + fbb.String("id", stream_.GetStreamId()); + fbb.String("peer_id", stream_.GetPeerId()); + fbb.String("sdp", stream_.GetLocalDescription()); + fbb.Vector("ice_candidates", [&]() { + for (const auto &candidate : stream_.GetIceCandidates()) { + fbb.String(candidate); + } + }); }); fbb.Finish(); diff --git a/modules/webrtc/SrcStreamManager.cc b/modules/webrtc/SrcStreamManager.cc index 574d9c7..ab77402 100644 --- a/modules/webrtc/SrcStreamManager.cc +++ b/modules/webrtc/SrcStreamManager.cc @@ -32,10 +32,6 @@ SrcStreamManager::SrcStreamManager(const std::string &topic, const std::string & SrcStreamManager::~SrcStreamManager() { - // TODO: You should take care about stream resource - for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) - itr->second->Destroy(); - streams_.clear(); } void SrcStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream) @@ -87,7 +83,7 @@ void SrcStreamManager::HandleStreamState(const std::string &discovery_id, if (sink_state.compare("STOP") == 0) HandleRemovedClient(discovery_id); else - DBG("Invalid message %s", sink_state); + ERR("Invalid message %s", sink_state); } void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id, @@ -95,53 +91,50 @@ void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id, { DBG("%s", __func__); if (!WebRtcMessage::IsValidStreamInfo(message)) { - DBG("Invalid streams info"); + ERR("Invalid streams info"); return; } - // sink_streams have a stream at normal situation - auto sink_streams = flexbuffers::GetRoot(message).AsVector(); - for (size_t stream_idx = 0; stream_idx < sink_streams.size(); ++stream_idx) { - auto stream = sink_streams[stream_idx].AsMap(); - auto id = stream["id"].AsString().str(); - auto peer_id = stream["peer_id"].AsString().str(); - auto sdp = stream["sdp"].AsString().str(); - std::vector ice_candidates; - auto ice_info = stream["ice_candidates"].AsVector(); - for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx) - ice_candidates.push_back(ice_info[ice_idx].AsString().str()); - UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates); - } + // have a stream at Current status + auto stream = flexbuffers::GetRoot(message).AsMap(); + auto id = stream["id"].AsString().str(); + auto peer_id = stream["peer_id"].AsString().str(); + auto sdp = stream["sdp"].AsString().str(); + std::vector ice_candidates; + auto ice_info = stream["ice_candidates"].AsVector(); + for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx) + ice_candidates.push_back(ice_info[ice_idx].AsString().str()); + UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates); } void SrcStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id, const std::string &peer_id, const std::string &sdp, const std::vector &ice_candidates) { - auto sink_stream = streams_.find(discovery_id); - if (sink_stream == streams_.end()) + // There's only one stream for a aitt ID + if (peer_aitt_id_.empty()) AddStream(discovery_id, id, sdp, ice_candidates); + else if (peer_aitt_id_ == discovery_id && peer_id.compare(stream_.GetStreamId()) != 0) + stream_.UpdatePeerInformation(ice_candidates); else - sink_stream->second->UpdatePeerInformation(ice_candidates); + ERR("Invalid peer ID"); } void SrcStreamManager::AddStream(const std::string &discovery_id, const std::string &id, const std::string &sdp, const std::vector &ice_candidates) { - auto stream = new WebRtcStream(); - SetWebRtcStreamCallbacks(*stream); - stream->Create(true, false); - stream->AttachCameraSource(); - stream->Start(); + SetWebRtcStreamCallbacks(stream_); + stream_.Create(true, false); + stream_.AttachCameraSource(); + stream_.Start(); std::stringstream s_stream; - s_stream << static_cast(stream); - - stream->SetStreamId(std::string(thread_id_ + s_stream.str())); - stream->SetPeerId(id); - stream->AddPeerInformation(sdp, ice_candidates); + s_stream << static_cast(&stream_); - streams_[discovery_id] = stream; + stream_.SetStreamId(std::string(thread_id_ + s_stream.str())); + stream_.SetPeerId(id); + peer_aitt_id_ = discovery_id; + stream_.AddPeerInformation(sdp, ice_candidates); return; } @@ -151,19 +144,15 @@ std::vector SrcStreamManager::GetDiscoveryMessage(void) std::vector message; flexbuffers::Builder fbb; - fbb.Vector([&] { - for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) { - fbb.Map([&] { - fbb.String("id", itr->second->GetStreamId()); - fbb.String("peer_id", itr->second->GetPeerId()); - fbb.String("sdp", itr->second->GetLocalDescription()); - fbb.Vector("ice_candidates", [&]() { - for (const auto &candidate : itr->second->GetIceCandidates()) { - fbb.String(candidate); - } - }); - }); - } + fbb.Map([&] { + fbb.String("id", stream_.GetStreamId()); + fbb.String("peer_id", stream_.GetPeerId()); + fbb.String("sdp", stream_.GetLocalDescription()); + fbb.Vector("ice_candidates", [&]() { + for (const auto &candidate : stream_.GetIceCandidates()) { + fbb.String(candidate); + } + }); }); fbb.Finish(); diff --git a/modules/webrtc/StreamManager.cc b/modules/webrtc/StreamManager.cc index 3e81a9d..a4bded0 100644 --- a/modules/webrtc/StreamManager.cc +++ b/modules/webrtc/StreamManager.cc @@ -36,10 +36,8 @@ void StreamManager::Start(void) void StreamManager::Stop(void) { DBG("%s %s", __func__, GetTopic().c_str()); - // TODO: You should take care about stream resource - for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) - itr->second->Destroy(); - streams_.clear(); + peer_aitt_id_.clear(); + stream_.Destroy(); if (stream_stop_cb_) stream_stop_cb_(); @@ -47,15 +45,12 @@ void StreamManager::Stop(void) void StreamManager::HandleRemovedClient(const std::string &discovery_id) { - auto stream_itr = streams_.find(discovery_id); - if (stream_itr == streams_.end()) { - DBG("There's no stream %s", discovery_id.c_str()); + if (peer_aitt_id_ != discovery_id){ + ERR("There's no stream %s", discovery_id.c_str()); return; } - // TODO: You should take care about stream resource - stream_itr->second->Destroy(); - streams_.erase(stream_itr); + stream_.Destroy(); return; } diff --git a/modules/webrtc/StreamManager.h b/modules/webrtc/StreamManager.h index 98677b7..70cfd69 100644 --- a/modules/webrtc/StreamManager.h +++ b/modules/webrtc/StreamManager.h @@ -22,6 +22,7 @@ #include #include #include +#include namespace AittWebRTCNamespace { @@ -51,9 +52,9 @@ class StreamManager { // TODO: why dont' we remove below std::string aitt_id_; std::string thread_id_; - // TODO: What if user copies the module? - // Think about that case with destructor - std::map streams_; + //We assume Module class can't be copyable + std::string peer_aitt_id_; + WebRtcStream stream_; StreamStartCallback stream_start_cb_; StreamStopCallback stream_stop_cb_; IceCandidateAddedCallback ice_candidate_added_cb_; diff --git a/modules/webrtc/WebRtcMessage.cc b/modules/webrtc/WebRtcMessage.cc index 422bd55..62f3162 100644 --- a/modules/webrtc/WebRtcMessage.cc +++ b/modules/webrtc/WebRtcMessage.cc @@ -56,17 +56,12 @@ WebRtcMessage::Type WebRtcMessage::getMessageType(const std::string &message) bool WebRtcMessage::IsValidStreamInfo(const std::vector &message) { - if (!flexbuffers::GetRoot(message).IsVector()) + if (!flexbuffers::GetRoot(message).IsMap()) return false; - auto streams = flexbuffers::GetRoot(message).AsVector(); - for (size_t idx = 0; idx < streams.size(); ++idx) { - auto stream = streams[idx].AsMap(); - if (!IsValidStream(stream)) - return false; - } + auto stream_info = flexbuffers::GetRoot(message).AsMap(); - return true; + return IsValidStream(stream_info); } bool WebRtcMessage::IsValidStream(const flexbuffers::Map &info) -- 2.7.4