StreamManager refactoring accepted/tizen/unified/20221115.172906
authorJiung Yu <jiung.yu@samsung.com>
Thu, 10 Nov 2022 01:56:48 +0000 (10:56 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Tue, 15 Nov 2022 02:26:18 +0000 (11:26 +0900)
13 files changed:
modules/webrtc/CMakeLists.txt
modules/webrtc/Module.cc
modules/webrtc/Module.h
modules/webrtc/SinkStreamManager.cc
modules/webrtc/SinkStreamManager.h
modules/webrtc/SrcStreamManager.cc
modules/webrtc/SrcStreamManager.h
modules/webrtc/StreamManager.cc [new file with mode: 0644]
modules/webrtc/StreamManager.h
modules/webrtc/WebRtcEventHandler.h
modules/webrtc/WebRtcStream.cc
modules/webrtc/WebRtcStream.h
modules/webrtc/tests/WEBRTC_test.cc

index 1f3014a..647a6a0 100644 (file)
@@ -11,6 +11,7 @@ INCLUDE_DIRECTORIES(${AITT_WEBRTC_NEEDS_INCLUDE_DIRS})
 LINK_DIRECTORIES(${AITT_WEBRTC_NEEDS_LIBRARY_DIRS})
 
 ADD_LIBRARY(WEBRTC_OBJ OBJECT
+    StreamManager.cc
     SrcStreamManager.cc
     SinkStreamManager.cc
     WebRtcMessage.cc
index 55f94b1..294c3d5 100644 (file)
 namespace AittWebRTCNamespace {
 
 Module::Module(AittDiscovery &discovery, const std::string &topic, AittStreamRole role)
-      : is_source_(IsSource(role)), discovery_(discovery), state_cb_user_data_(nullptr), receive_cb_user_data_(nullptr)
+      : is_source_(role == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER),
+        discovery_(discovery),
+        state_cb_user_data_(nullptr),
+        receive_cb_user_data_(nullptr)
 {
     std::stringstream s_stream;
     s_stream << std::this_thread::get_id();
@@ -59,21 +62,11 @@ void Module::SetConfig(const std::string &key, void *obj)
 {
 }
 
-
 void Module::Start(void)
 {
-    auto on_ice_candidate_added =
-          std::bind(&Module::OnIceCandidateAdded, this, std::placeholders::_1);
-    stream_manager_->SetIceCandidateAddedCallback(on_ice_candidate_added);
-
-    auto on_stream_ready = std::bind(&Module::OnStreamReady, this, std::placeholders::_1);
-    stream_manager_->SetStreamReadyCallback(on_stream_ready);
-
-    auto on_stream_started = std::bind(&Module::OnStreamStarted, this);
-    stream_manager_->SetStreamStartCallback(on_stream_started);
-
-    auto on_stream_stopped = std::bind(&Module::OnStreamStopped, this);
-    stream_manager_->SetStreamStopCallback(on_stream_stopped);
+    stream_manager_->SetIceCandidateAddedCallback(std::bind(&Module::OnIceCandidateAdded, this));
+    stream_manager_->SetStreamStartCallback(std::bind(&Module::OnStreamStarted, this));
+    stream_manager_->SetStreamStopCallback(std::bind(&Module::OnStreamStopped, this));
 
     stream_manager_->Start();
 }
@@ -82,21 +75,13 @@ void Module::Stop(void)
 {
 }
 
-void Module::OnIceCandidateAdded(WebRtcStream &stream)
+void Module::OnIceCandidateAdded(void)
 {
     DBG("OnIceCandidateAdded");
     auto msg = stream_manager_->GetDiscoveryMessage();
     discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
 }
 
-void Module::OnStreamReady(WebRtcStream &stream)
-{
-    DBG("OnStreamReady");
-
-    auto msg = stream_manager_->GetDiscoveryMessage();
-    discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
-}
-
 void Module::OnStreamStarted(void)
 {
     std::vector<uint8_t> msg;
@@ -132,10 +117,6 @@ void Module::SetReceiveCallback(ReceiveCallback cb, void *user_data)
     receive_callback_ = cb;
     receive_cb_user_data_ = user_data;
 }
-bool Module::IsSource(AittStreamRole role)
-{
-    return role == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER;
-}
 
 void Module::DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
       const void *msg, const int szmsg)
@@ -145,9 +126,8 @@ void Module::DiscoveryMessageCallback(const std::string &clientId, const std::st
         return;
     }
 
-    stream_manager_->HandleMsg(clientId,
-          std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
-                static_cast<const uint8_t *>(msg) + szmsg));
+    stream_manager_->HandleMsg(clientId, std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
+                                               static_cast<const uint8_t *>(msg) + szmsg));
 }
 
 }  // namespace AittWebRTCNamespace
index 11575d6..f43f328 100644 (file)
 
 #include <AittDiscovery.h>
 #include <AittStreamModule.h>
-#include "StreamManager.h"
 
 #include <map>
 #include <memory>
 #include <mutex>
 #include <string>
 
+#include "StreamManager.h"
+
 using AittDiscovery = aitt::AittDiscovery;
 using AittStreamModule = aitt::AittStreamModule;
 
@@ -42,12 +43,10 @@ class Module : public AittStreamModule {
     void Stop(void) override;
     void SetStateCallback(StateCallback cb, void *user_data) override;
     void SetReceiveCallback(ReceiveCallback cb, void *user_data) override;
-    static bool IsSource(AittStreamRole role);
 
   private:
-  //TODO: Update Ice Candidates when stream add candidates.
-    void OnIceCandidateAdded(WebRtcStream &stream);
-    void OnStreamReady(WebRtcStream &stream);
+    // TODO: Update Ice Candidates when stream add candidates.
+    void OnIceCandidateAdded(void);
     void OnStreamStarted(void);
     void OnStreamStopped(void);
     void DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
@@ -57,8 +56,8 @@ class Module : public AittStreamModule {
     AittDiscovery &discovery_;
     int discovery_cb_;
 
-    //TODO: What if user copies the module?
-    //Think about that case with destructor
+    // TODO: What if user copies the module?
+    // Think about that case with destructor
     StreamManager *stream_manager_;
     StateCallback state_callback_;
     void *state_cb_user_data_;
index 52b52e1..f7a9258 100644 (file)
 namespace AittWebRTCNamespace {
 SinkStreamManager::SinkStreamManager(const std::string &topic, const std::string &aitt_id,
       const std::string &thread_id)
-      : StreamManager(topic + "/SINK", aitt_id, thread_id), watching_topic_(topic + "/SRC")
+      : StreamManager(topic + "/SINK", topic + "/SRC", aitt_id, thread_id)
 {
 }
 
 SinkStreamManager::~SinkStreamManager()
 {
-    for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr)
+    for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
         itr->second->Destroy();
-    src_stream_.clear();
-}
-
-void SinkStreamManager::Start()
-{
-    DBG("%s %s", __func__, GetTopic().c_str());
-    if (stream_start_cb_)
-        stream_start_cb_();
-}
-
-void SinkStreamManager::Stop()
-{
-    DBG("%s %s", __func__, GetTopic().c_str());
-    // TODO: You should take care about stream resource
-    for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr)
-        itr->second->Destroy();
-    src_stream_.clear();
-
-    if (stream_stop_cb_)
-        stream_stop_cb_();
-}
-
-void SinkStreamManager::OnEncodedFrame(WebRtcStream &stream)
-{
-    if (encoded_frame_cb_)
-        encoded_frame_cb_(stream);
-}
-
-void SinkStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb)
-{
-    ice_candidate_added_cb_ = cb;
-}
-
-void SinkStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
-{
-    stream_ready_cb_ = cb;
-}
-
-void SinkStreamManager::SetStreamStartCallback(StreamStartCallback cb)
-{
-    stream_start_cb_ = cb;
-}
-
-void SinkStreamManager::SetStreamStopCallback(StreamStopCallback cb)
-{
-    stream_stop_cb_ = cb;
+    streams_.clear();
 }
 
 void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb)
@@ -87,31 +42,21 @@ void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb)
 
 void SinkStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
 {
-    auto on_stream_state_changed_cb = std::bind(&SinkStreamManager::OnStreamStateChanged, this,
-          std::placeholders::_1, std::ref(stream));
-    stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
-
-    auto on_ice_candidate_added_cb = std::bind(&SinkStreamManager::OnIceCandidate, this,
-          std::placeholders::_1, std::ref(stream));
-    stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb);
+    stream.GetEventHandler().SetOnStateChangedCb(std::bind(&SinkStreamManager::OnStreamStateChanged,
+          this, std::placeholders::_1, std::ref(stream)));
 
-    auto on_ice_gathering_state_changed_cb =
-          std::bind(&SinkStreamManager::OnIceGatheringStateNotify, this, std::placeholders::_1,
-                std::ref(stream));
-    stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
+    stream.GetEventHandler().SetOnIceCandidateCb(
+          std::bind(&SinkStreamManager::OnIceCandidate, this));
 
-    auto on_encoded_frame_cb =
-          std::bind(&SinkStreamManager::OnEncodedFrame, this, std::ref(stream));
-    stream.GetEventHandler().SetOnEncodedFrameCb(on_encoded_frame_cb);
+    stream.GetEventHandler().SetOnEncodedFrameCb(std::bind(&SinkStreamManager::OnEncodedFrame, this));
 }
 
 void SinkStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream)
 {
     DBG("OnSinkStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
     if (state == WebRtcState::Stream::NEGOTIATING) {
-        auto on_offer_created = std::bind(&SinkStreamManager::OnOfferCreated, this,
-              std::placeholders::_1, std::ref(stream));
-        stream.CreateOfferAsync(on_offer_created);
+        stream.CreateOfferAsync(std::bind(&SinkStreamManager::OnOfferCreated, this,
+              std::placeholders::_1, std::ref(stream)));
     }
 }
 
@@ -122,50 +67,22 @@ void SinkStreamManager::OnOfferCreated(std::string sdp, WebRtcStream &stream)
     stream.SetLocalDescription(sdp);
 }
 
-void SinkStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream)
+void SinkStreamManager::OnIceCandidate(void)
 {
     if (ice_candidate_added_cb_)
-        ice_candidate_added_cb_(stream);
-}
-
-void SinkStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state,
-      WebRtcStream &stream)
-{
-    DBG("Sink IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
-    if (state == WebRtcState::IceGathering::COMPLETE) {
-        if (stream_ready_cb_)
-            stream_ready_cb_(stream);
-    }
-}
-
-void SinkStreamManager::HandleRemovedClient(const std::string &discovery_id)
-{
-    auto src_stream_itr = src_stream_.find(discovery_id);
-    if (src_stream_itr == src_stream_.end()) {
-        DBG("There's no source stream %s", discovery_id.c_str());
-        return;
-    }
-
-    // TODO: You should take care about stream resource
-    src_stream_itr->second->Destroy();
-    src_stream_.erase(src_stream_itr);
-
-    return;
+        ice_candidate_added_cb_();
 }
 
-void SinkStreamManager::HandleMsg(const std::string &discovery_id,
-      const std::vector<uint8_t> &message)
+void SinkStreamManager::OnEncodedFrame(void)
 {
-    if (flexbuffers::GetRoot(message).IsString())
-        HandleStreamState(discovery_id, message);
-
-    else if (flexbuffers::GetRoot(message).IsVector())
-        HandleStreamInfo(discovery_id, message);
+    if (encoded_frame_cb_)
+        encoded_frame_cb_();
 }
 
 void SinkStreamManager::HandleStreamState(const std::string &discovery_id,
       const std::vector<uint8_t> &message)
 {
+    DBG("%s", __func__);
     auto src_state = flexbuffers::GetRoot(message).ToString();
 
     if (src_state.compare("START") == 0)
@@ -178,8 +95,9 @@ void SinkStreamManager::HandleStreamState(const std::string &discovery_id,
 
 void SinkStreamManager::HandleStartStream(const std::string &discovery_id)
 {
-    auto src_stream = src_stream_.find(discovery_id);
-    if (src_stream != src_stream_.end()) {
+    DBG("%s", __func__);
+    auto src_stream = streams_.find(discovery_id);
+    if (src_stream != streams_.end()) {
         DBG("There's stream already");
         return;
     }
@@ -199,7 +117,7 @@ void SinkStreamManager::AddStream(const std::string &discovery_id)
     s_stream << static_cast<void *>(stream);
 
     stream->SetStreamId(std::string(thread_id_ + s_stream.str()));
-    src_stream_[discovery_id] = stream;
+    streams_[discovery_id] = stream;
 }
 
 void SinkStreamManager::HandleStreamInfo(const std::string &discovery_id,
@@ -229,8 +147,8 @@ void SinkStreamManager::UpdateStreamInfo(const std::string &discovery_id, const
       const std::string &peer_id, const std::string &sdp,
       const std::vector<std::string> &ice_candidates)
 {
-    auto src_stream = src_stream_.find(discovery_id);
-    if (src_stream == src_stream_.end()) {
+    auto src_stream = streams_.find(discovery_id);
+    if (src_stream == streams_.end()) {
         DBG("No matching stream");
         return;
     }
@@ -246,7 +164,6 @@ void SinkStreamManager::UpdateStreamInfo(const std::string &discovery_id, const
         src_stream->second->AddPeerInformation(sdp, ice_candidates);
     } else
         src_stream->second->UpdatePeerInformation(ice_candidates);
-
 }
 
 std::vector<uint8_t> SinkStreamManager::GetDiscoveryMessage(void)
@@ -255,7 +172,7 @@ std::vector<uint8_t> SinkStreamManager::GetDiscoveryMessage(void)
 
     flexbuffers::Builder fbb;
     fbb.Vector([&] {
-        for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr) {
+        for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) {
             fbb.Map([&] {
                 fbb.String("id", itr->second->GetStreamId());
                 fbb.String("peer_id", itr->second->GetPeerId());
@@ -274,9 +191,4 @@ std::vector<uint8_t> SinkStreamManager::GetDiscoveryMessage(void)
     return message;
 }
 
-std::string SinkStreamManager::GetWatchingTopic(void)
-{
-    return watching_topic_;
-}
-
 }  // namespace AittWebRTCNamespace
index 1b27625..cf433bc 100644 (file)
@@ -24,47 +24,31 @@ namespace AittWebRTCNamespace {
 
 class SinkStreamManager : public StreamManager {
   public:
-    using EncodedFrameCallabck = std::function<void(WebRtcStream &stream)>;
+    using EncodedFrameCallabck = std::function<void(void)>;
     explicit SinkStreamManager(const std::string &topic, const std::string &aitt_id,
           const std::string &thread_id);
     virtual ~SinkStreamManager();
-    void Start(void) override;
-    void Stop(void) override;
-    void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) override;
-    void SetStreamReadyCallback(StreamReadyCallback cb) override;
-    void SetStreamStartCallback(StreamStartCallback cb) override;
-    void SetStreamStopCallback(StreamStopCallback cb) override;
     std::vector<uint8_t> GetDiscoveryMessage(void) override;
-    std::string GetWatchingTopic(void) override;
     // TODO: WebRTC CAPI doesn't allow destroy webrtc handle at callback.
     // We need to avoid that situation
     void SetOnEncodedFrameCallback(EncodedFrameCallabck cb);
-    void HandleRemovedClient(const std::string &discovery_id) override;
-    void HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message) override;
 
   private:
     void SetWebRtcStreamCallbacks(WebRtcStream &stream) override;
     void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
     void OnOfferCreated(std::string sdp, WebRtcStream &stream);
-    void OnIceCandidate(const std::string &candidate, WebRtcStream &stream);
-    void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream);
-    void OnEncodedFrame(WebRtcStream &stream);
-    void HandleStreamState(const std::string &discovery_id, const std::vector<uint8_t> &message);
+    void OnIceCandidate(void);
+    void OnEncodedFrame(void);
+    void HandleStreamState(const std::string &discovery_id,
+          const std::vector<uint8_t> &message) override;
     void HandleStartStream(const std::string &discovery_id);
-    void HandleStreamInfo(const std::string &discovery_id, const std::vector<uint8_t> &message);
+    void HandleStreamInfo(const std::string &discovery_id,
+          const std::vector<uint8_t> &message) override;
     void AddStream(const std::string &discovery_id);
     void UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
           const std::string &peer_id, const std::string &sdp,
           const std::vector<std::string> &ice_candidates);
 
-    std::string watching_topic_;
-    // TODO: What if user copies the module?
-    // Think about that case with destructor
-    std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> src_stream_;
-    IceCandidateAddedCallback ice_candidate_added_cb_;
-    StreamReadyCallback stream_ready_cb_;
-    StreamStartCallback stream_start_cb_;
-    StreamStopCallback stream_stop_cb_;
     EncodedFrameCallabck encoded_frame_cb_;
 };
 }  // namespace AittWebRTCNamespace
index be11c96..574d9c7 100644 (file)
@@ -26,77 +26,32 @@ namespace AittWebRTCNamespace {
 
 SrcStreamManager::SrcStreamManager(const std::string &topic, const std::string &aitt_id,
       const std::string &thread_id)
-      : StreamManager(topic + "/SRC", aitt_id, thread_id), watching_topic_(topic + "/SINK")
+      : StreamManager(topic + "/SRC", topic + "/SINK", aitt_id, thread_id)
 {
 }
 
 SrcStreamManager::~SrcStreamManager()
 {
     // TODO: You should take care about stream resource
-    for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr)
+    for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
         itr->second->Destroy();
-    sink_streams_.clear();
-}
-
-void SrcStreamManager::Start(void)
-{
-    DBG("%s %s", __func__, GetTopic().c_str());
-    if (stream_start_cb_)
-        stream_start_cb_();
-}
-
-void SrcStreamManager::Stop(void)
-{
-    DBG("%s %s", __func__, GetTopic().c_str());
-    // TODO: You should take care about stream resource
-    for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr)
-        itr->second->Destroy();
-    sink_streams_.clear();
-
-    if (stream_stop_cb_)
-        stream_stop_cb_();
-}
-
-void SrcStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb)
-{
-    ice_candidate_added_cb_ = cb;
-}
-
-void SrcStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
-{
-    stream_ready_cb_ = cb;
-}
-
-void SrcStreamManager::SetStreamStartCallback(StreamStartCallback cb)
-{
-    stream_start_cb_ = cb;
-}
-
-void SrcStreamManager::SetStreamStopCallback(StreamStopCallback cb)
-{
-    stream_stop_cb_ = cb;
+    streams_.clear();
 }
 
 void SrcStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
 {
-    auto on_stream_state_changed_cb = std::bind(&SrcStreamManager::OnStreamStateChanged, this,
-          std::placeholders::_1, std::ref(stream));
-    stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
-
-    auto on_ice_candidate_added_cb = std::bind(&SrcStreamManager::OnIceCandidate, this,
-          std::placeholders::_1, std::ref(stream));
-    stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb);
+    stream.GetEventHandler().SetOnStateChangedCb(
+          std::bind(&SrcStreamManager::OnStreamStateChanged, this, std::placeholders::_1));
 
-    auto on_signaling_state_notify_cb = std::bind(&SrcStreamManager::OnSignalingStateNotify, this,
-          std::placeholders::_1, std::ref(stream));
-    stream.GetEventHandler().SetOnSignalingStateNotifyCb(on_signaling_state_notify_cb);
+    stream.GetEventHandler().SetOnSignalingStateNotifyCb(
+          std::bind(&SrcStreamManager::OnSignalingStateNotify, this, std::placeholders::_1,
+                std::ref(stream)));
 
-    auto on_ice_gathering_state_changed_cb = std::bind(&SrcStreamManager::OnIceGatheringStateNotify,
-          this, std::placeholders::_1, std::ref(stream));
-    stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
+    stream.GetEventHandler().SetOnIceCandidateCb(
+          std::bind(&SrcStreamManager::OnIceCandidate, this));
 }
 
-void SrcStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream)
+void SrcStreamManager::OnStreamStateChanged(WebRtcState::Stream state)
 {
     DBG("OnSrcStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
 }
@@ -105,9 +60,8 @@ void SrcStreamManager::OnSignalingStateNotify(WebRtcState::Signaling state, WebR
 {
     DBG("OnSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
     if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
-        auto on_answer_created = std::bind(&SrcStreamManager::OnAnswerCreated, this,
-              std::placeholders::_1, std::ref(stream));
-        stream.CreateAnswerAsync(on_answer_created);
+        stream.CreateAnswerAsync(std::bind(&SrcStreamManager::OnAnswerCreated, this,
+              std::placeholders::_1, std::ref(stream)));
     }
 }
 
@@ -118,49 +72,16 @@ void SrcStreamManager::OnAnswerCreated(std::string sdp, WebRtcStream &stream)
     stream.SetLocalDescription(sdp);
 }
 
-void SrcStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream)
+void SrcStreamManager::OnIceCandidate(void)
 {
     if (ice_candidate_added_cb_)
-        ice_candidate_added_cb_(stream);
-}
-
-void SrcStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state,
-      WebRtcStream &stream)
-{
-    DBG("Src IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
-    if (state == WebRtcState::IceGathering::COMPLETE) {
-        if (stream_ready_cb_)
-            stream_ready_cb_(stream);
-    }
-}
-
-void SrcStreamManager::HandleRemovedClient(const std::string &discovery_id)
-{
-    auto sink_stream_itr = sink_streams_.find(discovery_id);
-    if (sink_stream_itr == sink_streams_.end()) {
-        DBG("There's no sink stream %s", discovery_id.c_str());
-        return;
-    }
-
-    // TODO: You should take care about stream resource
-    sink_stream_itr->second->Destroy();
-    sink_streams_.erase(sink_stream_itr);
-
-    return;
-}
-
-void SrcStreamManager::HandleMsg(const std::string &discovery_id,
-      const std::vector<uint8_t> &message)
-{
-    if (flexbuffers::GetRoot(message).IsString())
-        HandleStreamState(discovery_id, message);
-    else if (flexbuffers::GetRoot(message).IsVector())
-        HandleStreamInfo(discovery_id, message);
+        ice_candidate_added_cb_();
 }
 
 void SrcStreamManager::HandleStreamState(const std::string &discovery_id,
       const std::vector<uint8_t> &message)
 {
+    DBG("%s", __func__);
     auto sink_state = flexbuffers::GetRoot(message).ToString();
 
     if (sink_state.compare("STOP") == 0)
@@ -172,6 +93,7 @@ void SrcStreamManager::HandleStreamState(const std::string &discovery_id,
 void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id,
       const std::vector<uint8_t> &message)
 {
+    DBG("%s", __func__);
     if (!WebRtcMessage::IsValidStreamInfo(message)) {
         DBG("Invalid streams info");
         return;
@@ -196,8 +118,8 @@ void SrcStreamManager::UpdateStreamInfo(const std::string &discovery_id, const s
       const std::string &peer_id, const std::string &sdp,
       const std::vector<std::string> &ice_candidates)
 {
-    auto sink_stream = sink_streams_.find(discovery_id);
-    if (sink_stream == sink_streams_.end())
+    auto sink_stream = streams_.find(discovery_id);
+    if (sink_stream == streams_.end())
         AddStream(discovery_id, id, sdp, ice_candidates);
     else
         sink_stream->second->UpdatePeerInformation(ice_candidates);
@@ -219,7 +141,7 @@ void SrcStreamManager::AddStream(const std::string &discovery_id, const std::str
     stream->SetPeerId(id);
     stream->AddPeerInformation(sdp, ice_candidates);
 
-    sink_streams_[discovery_id] = stream;
+    streams_[discovery_id] = stream;
 
     return;
 }
@@ -230,7 +152,7 @@ std::vector<uint8_t> SrcStreamManager::GetDiscoveryMessage(void)
 
     flexbuffers::Builder fbb;
     fbb.Vector([&] {
-        for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr) {
+        for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) {
             fbb.Map([&] {
                 fbb.String("id", itr->second->GetStreamId());
                 fbb.String("peer_id", itr->second->GetPeerId());
@@ -249,9 +171,4 @@ std::vector<uint8_t> SrcStreamManager::GetDiscoveryMessage(void)
     return message;
 }
 
-std::string SrcStreamManager::GetWatchingTopic(void)
-{
-    return watching_topic_;
-}
-
 }  // namespace AittWebRTCNamespace
index aca572a..e87d063 100644 (file)
@@ -27,40 +27,22 @@ class SrcStreamManager : public StreamManager {
     explicit SrcStreamManager(const std::string &topic, const std::string &aitt_id,
           const std::string &thread_id);
     virtual ~SrcStreamManager();
-    void Start(void) override;
-    // TODO: What's the best way to shutdown all?
-    void Stop(void) override;
-    void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) override;
-    void SetStreamReadyCallback(StreamReadyCallback cb) override;
-    void SetStreamStartCallback(StreamStartCallback cb) override;
-    void SetStreamStopCallback(StreamStopCallback cb) override;
     std::vector<uint8_t> GetDiscoveryMessage(void) override;
-    std::string GetWatchingTopic(void) override;
-    void HandleRemovedClient(const std::string &discovery_id) override;
-    void HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message) override;
 
   private:
     void SetWebRtcStreamCallbacks(WebRtcStream &stream) override;
-    void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
+    void OnStreamStateChanged(WebRtcState::Stream state);
     void OnAnswerCreated(std::string sdp, WebRtcStream &stream);
-    void OnIceCandidate(const std::string &candidate, WebRtcStream &stream);
+    void OnIceCandidate(void);
     void OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream);
-    void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream);
-    void HandleStreamState(const std::string &discovery_id, const std::vector<uint8_t> &message);
-    void HandleStreamInfo(const std::string &discovery_id, const std::vector<uint8_t> &message);
+    void HandleStreamState(const std::string &discovery_id,
+          const std::vector<uint8_t> &message) override;
+    void HandleStreamInfo(const std::string &discovery_id,
+          const std::vector<uint8_t> &message) override;
     void AddStream(const std::string &discovery_id, const std::string &id, const std::string &sdp,
           const std::vector<std::string> &ice_candidates);
     void UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
           const std::string &peer_id, const std::string &sdp,
           const std::vector<std::string> &ice_candidates);
-
-    std::string watching_topic_;
-    // TODO: What if user copies the module?
-    // Think about that case with destructor
-    std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> sink_streams_;
-    IceCandidateAddedCallback ice_candidate_added_cb_;
-    StreamReadyCallback stream_ready_cb_;
-    StreamStartCallback stream_start_cb_;
-    StreamStopCallback stream_stop_cb_;
 };
 }  // namespace AittWebRTCNamespace
diff --git a/modules/webrtc/StreamManager.cc b/modules/webrtc/StreamManager.cc
new file mode 100644 (file)
index 0000000..3e81a9d
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StreamManager.h"
+
+#include "aitt_internal.h"
+
+namespace AittWebRTCNamespace {
+
+StreamManager::StreamManager(const std::string &topic, const std::string &watching_topic,
+      const std::string &aitt_id, const std::string &thread_id)
+      : topic_(topic), watching_topic_(watching_topic), aitt_id_(aitt_id), thread_id_(thread_id)
+{
+}
+
+void StreamManager::Start(void)
+{
+    DBG("%s %s", __func__, GetTopic().c_str());
+    if (stream_start_cb_)
+        stream_start_cb_();
+}
+
+void StreamManager::Stop(void)
+{
+    DBG("%s %s", __func__, GetTopic().c_str());
+    // TODO: You should take care about stream resource
+    for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
+        itr->second->Destroy();
+    streams_.clear();
+
+    if (stream_stop_cb_)
+        stream_stop_cb_();
+}
+
+void StreamManager::HandleRemovedClient(const std::string &discovery_id)
+{
+    auto stream_itr = streams_.find(discovery_id);
+    if (stream_itr == streams_.end()) {
+        DBG("There's no stream %s", discovery_id.c_str());
+        return;
+    }
+
+    // TODO: You should take care about stream resource
+    stream_itr->second->Destroy();
+    streams_.erase(stream_itr);
+
+    return;
+}
+
+void StreamManager::HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message)
+{
+    if (flexbuffers::GetRoot(message).IsString())
+        HandleStreamState(discovery_id, message);
+    else if (flexbuffers::GetRoot(message).IsVector())
+        HandleStreamInfo(discovery_id, message);
+}
+
+std::string StreamManager::GetTopic(void) const
+{
+    return topic_;
+}
+
+std::string StreamManager::GetWatchingTopic(void) const
+{
+    return watching_topic_;
+}
+
+void StreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb)
+{
+    ice_candidate_added_cb_ = cb;
+}
+
+void StreamManager::SetStreamStartCallback(StreamStartCallback cb)
+{
+    stream_start_cb_ = cb;
+}
+
+void StreamManager::SetStreamStopCallback(StreamStopCallback cb)
+{
+    stream_stop_cb_ = cb;
+}
+
+}  // namespace AittWebRTCNamespace
index a613a4a..98677b7 100644 (file)
@@ -27,36 +27,42 @@ namespace AittWebRTCNamespace {
 
 class StreamManager {
   public:
-    using IceCandidateAddedCallback = std::function<void(WebRtcStream &stream)>;
-    using StreamReadyCallback = std::function<void(WebRtcStream &stream)>;
+    using IceCandidateAddedCallback = std::function<void(void)>;
     using StreamStartCallback = std::function<void(void)>;
     using StreamStopCallback = std::function<void(void)>;
-    explicit StreamManager(const std::string &topic, const std::string &aitt_id,
-          const std::string &thread_id)
-          : topic_(topic), aitt_id_(aitt_id), thread_id_(thread_id){};
+    explicit StreamManager(const std::string &topic, const std::string &watching_topic,
+          const std::string &aitt_id, const std::string &thread_id);
     virtual ~StreamManager() = default;
-    std::string GetTopic(void) const { return topic_; };
-    std::string GetClientId(void) const { return aitt_id_; };
-    virtual void HandleRemovedClient(const std::string &discovery_id) = 0;
-    virtual void HandleMsg(const std::string &discovery_id,
-          const std::vector<uint8_t> &message) = 0;
-
-    virtual void Start(void) = 0;
-    virtual void Stop(void) = 0;
-    virtual void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) = 0;
-    virtual void SetStreamReadyCallback(StreamReadyCallback cb) = 0;
-    virtual void SetStreamStartCallback(StreamStartCallback cb) = 0;
-    virtual void SetStreamStopCallback(StreamStopCallback cb) = 0;
     virtual std::vector<uint8_t> GetDiscoveryMessage(void) = 0;
-    virtual std::string GetWatchingTopic(void) = 0;
+
+    void Start(void);
+    void Stop(void);
+    void HandleRemovedClient(const std::string &discovery_id);
+    void HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message);
+    void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb);
+    void SetStreamStartCallback(StreamStartCallback cb);
+    void SetStreamStopCallback(StreamStopCallback cb);
+    std::string GetTopic(void) const;
+    std::string GetWatchingTopic(void) const;
 
   protected:
     std::string topic_;
+    std::string watching_topic_;
     // TODO: why dont' we remove below
     std::string aitt_id_;
     std::string thread_id_;
+    // TODO: What if user copies the module?
+    // Think about that case with destructor
+    std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> streams_;
+    StreamStartCallback stream_start_cb_;
+    StreamStopCallback stream_stop_cb_;
+    IceCandidateAddedCallback ice_candidate_added_cb_;
 
   private:
     virtual void SetWebRtcStreamCallbacks(WebRtcStream &stream) = 0;
+    virtual void HandleStreamState(const std::string &discovery_id,
+          const std::vector<uint8_t> &message) = 0;
+    virtual void HandleStreamInfo(const std::string &discovery_id,
+          const std::vector<uint8_t> &message) = 0;
 };
 }  // namespace AittWebRTCNamespace
index 308171d..0cf9c99 100644 (file)
@@ -48,8 +48,7 @@ class WebRtcEventHandler {
     };
     void UnsetOnSignalingStateNotifyCb(void) { on_signaling_state_notify_cb_ = nullptr; };
 
-    void SetOnIceCandidateCb(
-          std::function<void(std::string)> on_ice_candidate_cb)
+    void SetOnIceCandidateCb(std::function<void(std::string)> on_ice_candidate_cb)
     {
         on_ice_candidate_cb_ = on_ice_candidate_cb;
     };
index 0e1662a..035393b 100644 (file)
@@ -229,6 +229,26 @@ bool WebRtcStream::SetRemoteDescription(const std::string &description)
     return ret == WEBRTC_ERROR_NONE;
 }
 
+void WebRtcStream::SetStreamId(const std::string &id)
+{
+    id_ = id;
+}
+
+std::string WebRtcStream::GetStreamId(void) const
+{
+    return id_;
+}
+
+void WebRtcStream::SetPeerId(const std::string &id)
+{
+    peer_id_ = id;
+}
+
+std::string &WebRtcStream::GetPeerId(void)
+{
+    return peer_id_;
+}
+
 bool WebRtcStream::AddIceCandidateFromMessage(const std::string &ice_message)
 {
     ERR("%s", __func__);
@@ -412,6 +432,11 @@ void WebRtcStream::DetachSignals(void)
     webrtc_data_channel_unset_open_cb(channel_);
 }
 
+WebRtcEventHandler &WebRtcStream::GetEventHandler(void)
+{
+    return event_handler_;
+}
+
 void WebRtcStream::OnError(webrtc_h webrtc, webrtc_error_e error, webrtc_state_e state,
       void *user_data)
 {
index 9b4c703..9e2c74e 100644 (file)
@@ -44,33 +44,17 @@ class WebRtcStream {
     void DetachSignals(void);
     // Cautions : Event handler is not a pointer. So, change event_handle after Set Event handler
     // doesn't affect event handler which is included in WebRtcStream
-    void SetEventHandler(WebRtcEventHandler event_handler) { event_handler_ = event_handler; };
-    WebRtcEventHandler &GetEventHandler(void) { return event_handler_; };
+    WebRtcEventHandler &GetEventHandler(void);
 
     bool CreateOfferAsync(std::function<void(std::string)> on_created_cb);
-    void CallOnOfferCreatedCb(std::string offer)
-    {
-        if (on_offer_created_cb_)
-            on_offer_created_cb_(offer);
-    }
     bool CreateAnswerAsync(std::function<void(std::string)> on_created_cb);
-    void CallOnAnswerCreatedCb(std::string answer)
-    {
-        if (on_answer_created_cb_)
-            on_answer_created_cb_(answer);
-    }
-    void SetPreparedLocalDescription(const std::string &description)
-    {
-        local_description_ = description;
-    };
-    std::string GetPreparedLocalDescription(void) const { return local_description_; };
 
     bool SetLocalDescription(const std::string &description);
     bool SetRemoteDescription(const std::string &description);
-    void SetStreamId(const std::string &id) { id_ = id; };
-    std::string GetStreamId(void) const { return id_; };
-    void SetPeerId(const std::string &id) { peer_id_ = id; };
-    std::string &GetPeerId(void) { return peer_id_; };
+    void SetStreamId(const std::string &id);
+    std::string GetStreamId(void) const;
+    void SetPeerId(const std::string &id);
+    std::string &GetPeerId(void);
 
     bool AddIceCandidateFromMessage(const std::string &ice_message);
     bool AddPeerInformation(const std::string &sdp, const std::vector<std::string> &ice_candidates);
index f983d11..f027203 100644 (file)
@@ -80,9 +80,8 @@ class WebRtcStreamTest : public testing::Test {
     {
         DBG("OnStreamStateChanged");
         if (state == WebRtcState::Stream::NEGOTIATING) {
-            auto on_offer_created =
-                  std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test);
-            stream.CreateOfferAsync(on_offer_created);
+            stream.CreateOfferAsync(
+                  std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test));
         }
     }
     static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcStreamTest *test)
@@ -114,13 +113,11 @@ TEST_F(WebRtcStreamTest, test_Start_WebRtcSrcStream_OnDevice)
     WebRtcStream stream{};
     EXPECT_EQ(true, stream.Create(true, false)) << "Failed to create source stream";
     EXPECT_EQ(true, stream.AttachCameraSource()) << "Failed to attach camera source";
-    auto on_stream_state_changed_cb =
-          std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream), this);
-    stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
+    stream.GetEventHandler().SetOnStateChangedCb(
+          std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream), this));
 
-    auto on_ice_gathering_state_changed_cb =
-          std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this);
-    stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
+    stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(
+          std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this));
     stream.Start();
     IterateEventLoop();
     EXPECT_EQ(WebRtcMessage::Type::SDP, WebRtcMessage::getMessageType(local_description_));
@@ -144,11 +141,9 @@ class WebRtcSourceOffererTest : public testing::Test {
           WebRtcSourceOffererTest *test)
     {
         DBG("OnSrcStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
-        if (state == WebRtcState::Stream::NEGOTIATING) {
-            auto on_offer_created =
-                  std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test);
-            stream.CreateOfferAsync(on_offer_created);
-        }
+        if (state == WebRtcState::Stream::NEGOTIATING)
+            stream.CreateOfferAsync(
+                  std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test));
     }
 
     static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcSourceOffererTest *test)
@@ -194,11 +189,9 @@ class WebRtcSourceOffererTest : public testing::Test {
           WebRtcSourceOffererTest *test)
     {
         DBG("OnSinkSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
-        if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
-            auto on_answer_created =
-                  std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test);
-            stream.CreateAnswerAsync(on_answer_created);
-        }
+        if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER)
+            stream.CreateAnswerAsync(
+                  std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test));
     }
 
     static void OnSinkIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream,
@@ -229,36 +222,27 @@ TEST_F(WebRtcSourceOffererTest, test_Start_WebRtcStream_OnDevice)
 {
     EXPECT_EQ(true, src_stream_.Create(true, false)) << "Failed to create source stream";
     EXPECT_EQ(true, src_stream_.AttachCameraSource()) << "Failed to attach camera source";
-    auto on_src_stream_state_changed_cb =
-          std::bind(OnSrcStreamStateChanged, std::placeholders::_1, std::ref(src_stream_), this);
-    src_stream_.GetEventHandler().SetOnStateChangedCb(on_src_stream_state_changed_cb);
+    src_stream_.GetEventHandler().SetOnStateChangedCb(
+          std::bind(OnSrcStreamStateChanged, std::placeholders::_1, std::ref(src_stream_), this));
 
-    auto on_src_signaling_state_changed_cb =
-          std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this);
-    src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_src_signaling_state_changed_cb);
+    src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(
+          std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this));
 
-    auto on_src_ice_gathering_state_changed_cb = std::bind(OnSrcIceGatheringStateNotify,
-          std::placeholders::_1, std::ref(src_stream_), this);
-    src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(
-          on_src_ice_gathering_state_changed_cb);
+    src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind(
+          OnSrcIceGatheringStateNotify, std::placeholders::_1, std::ref(src_stream_), this));
     src_stream_.Start();
 
     EXPECT_EQ(true, sink_stream_.Create(false, false)) << "Failed to create sink stream";
-    auto on_sink_stream_state_changed_cb =
-          std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this);
-    sink_stream_.GetEventHandler().SetOnStateChangedCb(on_sink_stream_state_changed_cb);
+    sink_stream_.GetEventHandler().SetOnStateChangedCb(
+          std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this));
 
-    auto on_sink_signaling_state_changed_cb = std::bind(OnSinkSignalingStateNotify,
-          std::placeholders::_1, std::ref(sink_stream_), this);
-    sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_sink_signaling_state_changed_cb);
+    sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(std::bind(OnSinkSignalingStateNotify,
+          std::placeholders::_1, std::ref(sink_stream_), this));
 
-    auto on_sink_ice_gathering_state_changed_cb = std::bind(OnSinkIceGatheringStateNotify,
-          std::placeholders::_1, std::ref(sink_stream_), this);
-    sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(
-          on_sink_ice_gathering_state_changed_cb);
+    sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind(
+          OnSinkIceGatheringStateNotify, std::placeholders::_1, std::ref(sink_stream_), this));
 
-    auto on_sink_encoded_frame_cb = std::bind(OnSinkStreamEncodedFrame, this);
-    sink_stream_.GetEventHandler().SetOnEncodedFrameCb(on_sink_encoded_frame_cb);
+    sink_stream_.GetEventHandler().SetOnEncodedFrameCb(std::bind(OnSinkStreamEncodedFrame, this));
     sink_stream_.Start();
     IterateEventLoop();
 }
@@ -277,11 +261,9 @@ class WebRtcSinkOffererTest : public testing::Test {
           WebRtcSinkOffererTest *test)
     {
         DBG("OnSinkStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
-        if (state == WebRtcState::Stream::NEGOTIATING) {
-            auto on_offer_created =
-                  std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test);
-            stream.CreateOfferAsync(on_offer_created);
-        }
+        if (state == WebRtcState::Stream::NEGOTIATING)
+            stream.CreateOfferAsync(
+                  std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test));
     }
 
     static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcSinkOffererTest *test)
@@ -326,11 +308,9 @@ class WebRtcSinkOffererTest : public testing::Test {
           WebRtcSinkOffererTest *test)
     {
         DBG("OnSrcSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
-        if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
-            auto on_answer_created =
-                  std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test);
-            stream.CreateAnswerAsync(on_answer_created);
-        }
+        if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER)
+            stream.CreateAnswerAsync(
+                  std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test));
     }
 
     static void OnSrcIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream,
@@ -365,32 +345,24 @@ TEST_F(WebRtcSinkOffererTest, test_Start_WebRtcStream_OnDevice)
           std::bind(OnSrcStreamStateChanged, std::placeholders::_1, std::ref(src_stream_), this);
     src_stream_.GetEventHandler().SetOnStateChangedCb(on_src_stream_state_changed_cb);
 
-    auto on_src_signaling_state_changed_cb =
-          std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this);
-    src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_src_signaling_state_changed_cb);
+    src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(
+          std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this));
 
-    auto on_src_ice_gathering_state_changed_cb = std::bind(OnSrcIceGatheringStateNotify,
-          std::placeholders::_1, std::ref(src_stream_), this);
-    src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(
-          on_src_ice_gathering_state_changed_cb);
+    src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind(
+          OnSrcIceGatheringStateNotify, std::placeholders::_1, std::ref(src_stream_), this));
     src_stream_.Start();
 
     EXPECT_EQ(true, sink_stream_.Create(false, false)) << "Failed to create sink stream";
-    auto on_sink_stream_state_changed_cb =
-          std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this);
-    sink_stream_.GetEventHandler().SetOnStateChangedCb(on_sink_stream_state_changed_cb);
+    sink_stream_.GetEventHandler().SetOnStateChangedCb(
+          std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this));
 
-    auto on_sink_signaling_state_changed_cb = std::bind(OnSinkSignalingStateNotify,
-          std::placeholders::_1, std::ref(sink_stream_), this);
-    sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_sink_signaling_state_changed_cb);
+    sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(std::bind(OnSinkSignalingStateNotify,
+          std::placeholders::_1, std::ref(sink_stream_), this));
 
-    auto on_sink_ice_gathering_state_changed_cb = std::bind(OnSinkIceGatheringStateNotify,
-          std::placeholders::_1, std::ref(sink_stream_), this);
-    sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(
-          on_sink_ice_gathering_state_changed_cb);
+    sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind(
+          OnSinkIceGatheringStateNotify, std::placeholders::_1, std::ref(sink_stream_), this));
 
-    auto on_sink_encoded_frame_cb = std::bind(OnSinkStreamEncodedFrame, this);
-    sink_stream_.GetEventHandler().SetOnEncodedFrameCb(on_sink_encoded_frame_cb);
+    sink_stream_.GetEventHandler().SetOnEncodedFrameCb(std::bind(OnSinkStreamEncodedFrame, this));
     sink_stream_.Start();
     IterateEventLoop();
 }
@@ -509,13 +481,10 @@ TEST_F(SrcStreamManagerTest, test_SrcStreamManager_Start_OnDevice)
     // Src should subscribe WEBRTC_SINK_TOPIC but this is for test
     discovery_cb_ = discovery_engine_.AddDiscoveryCB(src_manager->GetTopic(), on_discovered);
 
-    auto on_stream_started =
-          std::bind(OnStreamStarted, this, static_cast<SrcStreamManager *>(src_manager.get()));
-    src_manager->SetStreamStartCallback(on_stream_started);
-
-    auto on_stream_stopped =
-          std::bind(OnStreamStopped, this, static_cast<SrcStreamManager *>(src_manager.get()));
-    src_manager->SetStreamStopCallback(on_stream_stopped);
+    src_manager->SetStreamStartCallback(
+          std::bind(OnStreamStarted, this, static_cast<SrcStreamManager *>(src_manager.get())));
+    src_manager->SetStreamStopCallback(
+          std::bind(OnStreamStopped, this, static_cast<SrcStreamManager *>(src_manager.get())));
 
     src_manager->Start();
     IterateEventLoop();
@@ -558,8 +527,7 @@ class SinkStreamManagerTest : public testing::Test {
             g_main_loop_quit(test->mainLoop_);
     }
 
-    static void OnIceCandidateAdded(WebRtcStream &stream, SinkStreamManager *sink_mgr,
-          SinkStreamManagerTest *test)
+    static void OnIceCandidateAdded(SinkStreamManager *sink_mgr, SinkStreamManagerTest *test)
     {
         DBG("OnIceCandidateAdded");
         auto discovery_message = sink_mgr->GetDiscoveryMessage();
@@ -567,15 +535,6 @@ class SinkStreamManagerTest : public testing::Test {
               discovery_message.size());
     }
 
-    static void OnStreamReady(WebRtcStream &stream, SinkStreamManager *sink_mgr,
-          SinkStreamManagerTest *test)
-    {
-        DBG("OnStreamReady");
-        auto discovery_message = sink_mgr->GetDiscoveryMessage();
-        test->discovery_engine_.UpdateDiscoveryMsg(sink_mgr->GetTopic(), discovery_message.data(),
-              discovery_message.size());
-    }
-
     int discovery_cb_;
     std::string test_topic_;
     aitt::AittDiscovery discovery_engine_;
@@ -595,13 +554,8 @@ TEST_F(SinkStreamManagerTest, test_SinkStreamManager_Start_OnDevice)
     // Sink should subscribe WEBRTC_SRC_TOPIC but this is for test
     discovery_cb_ = discovery_engine_.AddDiscoveryCB(sink_manager->GetTopic(), on_discovered);
 
-    auto on_ice_candidate_added = std::bind(OnIceCandidateAdded, std::placeholders::_1,
-          static_cast<SinkStreamManager *>(sink_manager.get()), this);
-    sink_manager->SetIceCandidateAddedCallback(on_ice_candidate_added);
-
-    auto on_stream_ready = std::bind(OnStreamReady, std::placeholders::_1,
-          static_cast<SinkStreamManager *>(sink_manager.get()), this);
-    sink_manager->SetStreamReadyCallback(on_stream_ready);
+    sink_manager->SetIceCandidateAddedCallback(std::bind(OnIceCandidateAdded,
+          static_cast<SinkStreamManager *>(sink_manager.get()), this));
 
     sink_manager->Start();
     IterateEventLoop();
@@ -723,18 +677,12 @@ class SinkSrcStreamManagerTest : public testing::Test {
               sink_manager_->GetWatchingTopic(), discovered_at_sink);
         sink_discovery_engine_.Restart();
 
-        auto on_stream_stopped = std::bind(OnSinkStreamStopped, this);
-        sink_manager_->SetStreamStopCallback(on_stream_stopped);
+        sink_manager_->SetStreamStopCallback(std::bind(OnSinkStreamStopped, this));
+        sink_manager_->SetIceCandidateAddedCallback(
+              std::bind(OnSinkIceCandidate, this));
 
-        auto on_ice_candidate = std::bind(OnSinkIceCandidate, std::placeholders::_1, this);
-        sink_manager_->SetIceCandidateAddedCallback(on_ice_candidate);
-
-        auto on_sink_stream_ready = std::bind(OnSinkStreamReady, std::placeholders::_1, this);
-        sink_manager_->SetStreamReadyCallback(on_sink_stream_ready);
-
-        auto on_encoded_frame = std::bind(OnEncodedFrame, std::placeholders::_1, this);
         static_cast<SinkStreamManager *>(sink_manager_.get())
-              ->SetOnEncodedFrameCallback(on_encoded_frame);
+              ->SetOnEncodedFrameCallback(std::bind(OnEncodedFrame, this));
 
         sink_manager_->Start();
     }
@@ -752,7 +700,7 @@ class SinkSrcStreamManagerTest : public testing::Test {
               msg.size());
     }
 
-    static void OnSinkIceCandidate(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+    static void OnSinkIceCandidate(SinkSrcStreamManagerTest *test)
     {
         DBG("OnSinkIceCandidate");
         auto discovery_message = test->sink_manager_->GetDiscoveryMessage();
@@ -760,18 +708,10 @@ class SinkSrcStreamManagerTest : public testing::Test {
               discovery_message.data(), discovery_message.size());
     }
 
-    static void OnSinkStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
-    {
-        DBG("OnSinkStreamReady");
-
-        auto discovery_message = test->sink_manager_->GetDiscoveryMessage();
-        test->sink_discovery_engine_.UpdateDiscoveryMsg(test->sink_manager_->GetTopic(),
-              discovery_message.data(), discovery_message.size());
-    }
-
-    static void OnEncodedFrame(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+    static void OnEncodedFrame(SinkSrcStreamManagerTest *test)
     {
-        static_cast<SinkStreamManager *>(test->sink_manager_.get())->SetOnEncodedFrameCallback(nullptr);
+        static_cast<SinkStreamManager *>(test->sink_manager_.get())
+              ->SetOnEncodedFrameCallback(nullptr);
 
         if (test->stop_sink_first_)
             test->AddIdleStopSinkStream();
@@ -787,15 +727,10 @@ class SinkSrcStreamManagerTest : public testing::Test {
               discovered_at_src);
         src_discovery_engine_.Restart();
 
-        auto on_stream_started = std::bind(OnStreamStarted, this);
-        src_manager_->SetStreamStartCallback(on_stream_started);
-        auto on_stream_stopped = std::bind(OnSrcStreamStopped, this);
-        src_manager_->SetStreamStopCallback(on_stream_stopped);
-        auto on_ice_candidate = std::bind(OnSrcIceCandidate, std::placeholders::_1, this);
-        src_manager_->SetIceCandidateAddedCallback(on_ice_candidate);
-
-        auto on_src_stream_ready = std::bind(OnSrcStreamReady, std::placeholders::_1, this);
-        src_manager_->SetStreamReadyCallback(on_src_stream_ready);
+        src_manager_->SetStreamStartCallback(std::bind(OnStreamStarted, this));
+        src_manager_->SetStreamStopCallback(std::bind(OnSrcStreamStopped, this));
+        src_manager_->SetIceCandidateAddedCallback(
+              std::bind(OnSrcIceCandidate, this));
 
         src_manager_->Start();
     }
@@ -827,7 +762,7 @@ class SinkSrcStreamManagerTest : public testing::Test {
               msg.size());
     }
 
-    static void OnSrcIceCandidate(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+    static void OnSrcIceCandidate(SinkSrcStreamManagerTest *test)
     {
         DBG("OnIceCandidateAdded");
         auto discovery_message = test->src_manager_->GetDiscoveryMessage();
@@ -835,15 +770,6 @@ class SinkSrcStreamManagerTest : public testing::Test {
               discovery_message.data(), discovery_message.size());
     }
 
-    static void OnSrcStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
-    {
-        DBG("OnSrcStreamReady");
-
-        auto discovery_message = test->src_manager_->GetDiscoveryMessage();
-        test->src_discovery_engine_.UpdateDiscoveryMsg(test->src_manager_->GetTopic(),
-              discovery_message.data(), discovery_message.size());
-    }
-
     void StartSinkFirst(void)
     {
         start_sink_id_ = g_timeout_add_seconds(1, GSourceStartSink, this);