Add WebRTC Discovery using AITT Discovery
authorJiung Yu <jiung.yu@samsung.com>
Tue, 8 Nov 2022 00:50:10 +0000 (09:50 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Wed, 9 Nov 2022 08:27:45 +0000 (17:27 +0900)
14 files changed:
common/AittDiscovery.cc
modules/webrtc/Module.cc
modules/webrtc/Module.h
modules/webrtc/SinkStreamManager.cc
modules/webrtc/SinkStreamManager.h
modules/webrtc/SrcStreamManager.cc
modules/webrtc/SrcStreamManager.h
modules/webrtc/StreamManager.h
modules/webrtc/WebRtcEventHandler.h
modules/webrtc/WebRtcMessage.cc
modules/webrtc/WebRtcMessage.h
modules/webrtc/WebRtcStream.cc
modules/webrtc/WebRtcStream.h
modules/webrtc/tests/WEBRTC_test.cc

index 21951439317a870304389043fad6904abe123f6b..a3640ff07007fe0b9f711ed5e56618732f01ad0d 100644 (file)
@@ -58,7 +58,7 @@ void AittDiscovery::Start(const std::string &host, int port, const std::string &
 
 void AittDiscovery::Restart()
 {
-    RET_IF(callback_handle);
+    RET_IF(callback_handle == nullptr);
     discovery_mq->Unsubscribe(callback_handle);
     callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
           static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
index 4810eb608a7d2c7dfdc2d27635a758370c21aafb..55f94b1cd443c7f07e521a509badd7f6ccd52407 100644 (file)
 namespace AittWebRTCNamespace {
 
 Module::Module(AittDiscovery &discovery, const std::string &topic, AittStreamRole role)
-      : is_source_(IsSource(role)), topic_(topic), discovery_(discovery)
+      : is_source_(IsSource(role)), discovery_(discovery), state_cb_user_data_(nullptr), receive_cb_user_data_(nullptr)
 {
-    discovery_cb_ = discovery_.AddDiscoveryCB(topic,
-          std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1,
-                std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
-
     std::stringstream s_stream;
     s_stream << std::this_thread::get_id();
 
-    if (is_source_) {
-        stream_manager_ = new SrcStreamManager(topic_, discovery_.GetId(), s_stream.str());
-    } else {
-        stream_manager_ = new SinkStreamManager(topic_, discovery_.GetId(), s_stream.str());
-    }
+    if (is_source_)
+        stream_manager_ = new SrcStreamManager(topic, discovery_.GetId(), s_stream.str());
+    else
+        stream_manager_ = new SinkStreamManager(topic, discovery_.GetId(), s_stream.str());
+
+    discovery_cb_ = discovery_.AddDiscoveryCB(stream_manager_->GetWatchingTopic(),
+          std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1,
+                std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
 }
 
 Module::~Module(void)
@@ -60,10 +59,22 @@ void Module::SetConfig(const std::string &key, void *obj)
 {
 }
 
+
 void Module::Start(void)
 {
+    auto on_ice_candidate_added =
+          std::bind(&Module::OnIceCandidateAdded, this, std::placeholders::_1);
+    stream_manager_->SetIceCandidateAddedCallback(on_ice_candidate_added);
+
     auto on_stream_ready = std::bind(&Module::OnStreamReady, this, std::placeholders::_1);
     stream_manager_->SetStreamReadyCallback(on_stream_ready);
+
+    auto on_stream_started = std::bind(&Module::OnStreamStarted, this);
+    stream_manager_->SetStreamStartCallback(on_stream_started);
+
+    auto on_stream_stopped = std::bind(&Module::OnStreamStopped, this);
+    stream_manager_->SetStreamStopCallback(on_stream_stopped);
+
     stream_manager_->Start();
 }
 
@@ -71,13 +82,43 @@ void Module::Stop(void)
 {
 }
 
+void Module::OnIceCandidateAdded(WebRtcStream &stream)
+{
+    DBG("OnIceCandidateAdded");
+    auto msg = stream_manager_->GetDiscoveryMessage();
+    discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
+}
+
 void Module::OnStreamReady(WebRtcStream &stream)
 {
     DBG("OnStreamReady");
 
-    auto discovery_message = WebRtcMessage::GenerateDiscoveryMessage(topic_, is_source_,
-          stream.GetLocalDescription(), stream.GetIceCandidates());
-    discovery_.UpdateDiscoveryMsg(topic_, discovery_message.data(), discovery_message.size());
+    auto msg = stream_manager_->GetDiscoveryMessage();
+    discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
+}
+
+void Module::OnStreamStarted(void)
+{
+    std::vector<uint8_t> msg;
+
+    flexbuffers::Builder fbb;
+    fbb.String("START");
+    fbb.Finish();
+
+    msg = fbb.GetBuffer();
+    discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
+}
+
+void Module::OnStreamStopped(void)
+{
+    std::vector<uint8_t> msg;
+
+    flexbuffers::Builder fbb;
+    fbb.String("STOP");
+    fbb.Finish();
+
+    msg = fbb.GetBuffer();
+    discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
 }
 
 void Module::SetStateCallback(StateCallback cb, void *user_data)
@@ -99,15 +140,12 @@ bool Module::IsSource(AittStreamRole role)
 void Module::DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
       const void *msg, const int szmsg)
 {
-    if (!clientId.compare(discovery_.GetId()))
-        return;
-
     if (!status.compare(AittDiscovery::WILL_LEAVE_NETWORK)) {
         stream_manager_->HandleRemovedClient(clientId);
         return;
     }
 
-    stream_manager_->HandleDiscoveredStream(clientId,
+    stream_manager_->HandleMsg(clientId,
           std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
                 static_cast<const uint8_t *>(msg) + szmsg));
 }
index 4023f217ce332289112e39b1d05ff2ae001edbbe..11575d6d145dc7dc7ffa6ba1524c82d5da2554da 100644 (file)
@@ -45,12 +45,15 @@ class Module : public AittStreamModule {
     static bool IsSource(AittStreamRole role);
 
   private:
+  //TODO: Update Ice Candidates when stream add candidates.
+    void OnIceCandidateAdded(WebRtcStream &stream);
     void OnStreamReady(WebRtcStream &stream);
+    void OnStreamStarted(void);
+    void OnStreamStopped(void);
     void DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
           const void *msg, const int szmsg);
 
     bool is_source_;
-    std::string topic_;
     AittDiscovery &discovery_;
     int discovery_cb_;
 
index 2eab72c7a72432054c7fb2d7e019999a88553755..52b52e1694d18434afe79de1f36db7ce4815de93 100644 (file)
@@ -15,6 +15,8 @@
  */
 #include "SinkStreamManager.h"
 
+#include <flatbuffers/flexbuffers.h>
+
 #include <sstream>
 
 #include "aitt_internal.h"
 namespace AittWebRTCNamespace {
 SinkStreamManager::SinkStreamManager(const std::string &topic, const std::string &aitt_id,
       const std::string &thread_id)
-      : StreamManager(topic, aitt_id, thread_id)
+      : StreamManager(topic + "/SINK", aitt_id, thread_id), watching_topic_(topic + "/SRC")
 {
-    std::stringstream s_stream;
-    s_stream << static_cast<void *>(&stream_);
-    stream_.SetStreamId(aitt_id + thread_id + s_stream.str());
 }
+
 SinkStreamManager::~SinkStreamManager()
 {
-    Stop();
+    for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr)
+        itr->second->Destroy();
+    src_stream_.clear();
+}
+
+void SinkStreamManager::Start()
+{
+    DBG("%s %s", __func__, GetTopic().c_str());
+    if (stream_start_cb_)
+        stream_start_cb_();
+}
+
+void SinkStreamManager::Stop()
+{
+    DBG("%s %s", __func__, GetTopic().c_str());
+    // TODO: You should take care about stream resource
+    for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr)
+        itr->second->Destroy();
+    src_stream_.clear();
+
+    if (stream_stop_cb_)
+        stream_stop_cb_();
+}
+
+void SinkStreamManager::OnEncodedFrame(WebRtcStream &stream)
+{
+    if (encoded_frame_cb_)
+        encoded_frame_cb_(stream);
+}
+
+void SinkStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb)
+{
+    ice_candidate_added_cb_ = cb;
+}
+
+void SinkStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
+{
+    stream_ready_cb_ = cb;
+}
+
+void SinkStreamManager::SetStreamStartCallback(StreamStartCallback cb)
+{
+    stream_start_cb_ = cb;
+}
+
+void SinkStreamManager::SetStreamStopCallback(StreamStopCallback cb)
+{
+    stream_stop_cb_ = cb;
+}
+
+void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb)
+{
+    encoded_frame_cb_ = cb;
 }
 
 void SinkStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
 {
-    auto on_stream_state_changed_cb =
-          std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream));
+    auto on_stream_state_changed_cb = std::bind(&SinkStreamManager::OnStreamStateChanged, this,
+          std::placeholders::_1, std::ref(stream));
     stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
 
+    auto on_ice_candidate_added_cb = std::bind(&SinkStreamManager::OnIceCandidate, this,
+          std::placeholders::_1, std::ref(stream));
+    stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb);
+
     auto on_ice_gathering_state_changed_cb =
-          std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this);
+          std::bind(&SinkStreamManager::OnIceGatheringStateNotify, this, std::placeholders::_1,
+                std::ref(stream));
     stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
 
-    auto on_encoded_frame_cb = std::bind(OnEncodedFrame, std::ref(stream), this);
+    auto on_encoded_frame_cb =
+          std::bind(&SinkStreamManager::OnEncodedFrame, this, std::ref(stream));
     stream.GetEventHandler().SetOnEncodedFrameCb(on_encoded_frame_cb);
 }
 
@@ -51,7 +109,8 @@ void SinkStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcSt
 {
     DBG("OnSinkStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
     if (state == WebRtcState::Stream::NEGOTIATING) {
-        auto on_offer_created = std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream));
+        auto on_offer_created = std::bind(&SinkStreamManager::OnOfferCreated, this,
+              std::placeholders::_1, std::ref(stream));
         stream.CreateOfferAsync(on_offer_created);
     }
 }
@@ -63,69 +122,161 @@ void SinkStreamManager::OnOfferCreated(std::string sdp, WebRtcStream &stream)
     stream.SetLocalDescription(sdp);
 }
 
+void SinkStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream)
+{
+    if (ice_candidate_added_cb_)
+        ice_candidate_added_cb_(stream);
+}
+
 void SinkStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state,
-      WebRtcStream &stream, SinkStreamManager *manager)
+      WebRtcStream &stream)
 {
     DBG("Sink IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
     if (state == WebRtcState::IceGathering::COMPLETE) {
-        if (manager && manager->stream_ready_cb_)
-            manager->stream_ready_cb_(stream);
+        if (stream_ready_cb_)
+            stream_ready_cb_(stream);
     }
 }
 
-void SinkStreamManager::OnEncodedFrame(WebRtcStream &stream, SinkStreamManager *manager)
+void SinkStreamManager::HandleRemovedClient(const std::string &discovery_id)
 {
-    if (manager && manager->encoded_frame_cb_)
-        manager->encoded_frame_cb_(stream);
+    auto src_stream_itr = src_stream_.find(discovery_id);
+    if (src_stream_itr == src_stream_.end()) {
+        DBG("There's no source stream %s", discovery_id.c_str());
+        return;
+    }
+
+    // TODO: You should take care about stream resource
+    src_stream_itr->second->Destroy();
+    src_stream_.erase(src_stream_itr);
+
+    return;
 }
 
-void SinkStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
+void SinkStreamManager::HandleMsg(const std::string &discovery_id,
+      const std::vector<uint8_t> &message)
 {
-    stream_ready_cb_ = cb;
+    if (flexbuffers::GetRoot(message).IsString())
+        HandleStreamState(discovery_id, message);
+
+    else if (flexbuffers::GetRoot(message).IsVector())
+        HandleStreamInfo(discovery_id, message);
 }
 
-void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb)
+void SinkStreamManager::HandleStreamState(const std::string &discovery_id,
+      const std::vector<uint8_t> &message)
 {
-    encoded_frame_cb_ = cb;
+    auto src_state = flexbuffers::GetRoot(message).ToString();
+
+    if (src_state.compare("START") == 0)
+        HandleStartStream(discovery_id);
+    else if (src_state.compare("STOP") == 0)
+        HandleRemovedClient(discovery_id);
+    else
+        DBG("Invalid message %s", src_state);
 }
 
-void SinkStreamManager::Start()
+void SinkStreamManager::HandleStartStream(const std::string &discovery_id)
 {
-    // TODO: Handle failures on create and start
-    SetWebRtcStreamCallbacks(stream_);
-    stream_.Create(false, false);
-    stream_.Start();
+    auto src_stream = src_stream_.find(discovery_id);
+    if (src_stream != src_stream_.end()) {
+        DBG("There's stream already");
+        return;
+    }
+
+    DBG("Src Stream Started");
+    AddStream(discovery_id);
 }
 
-void SinkStreamManager::Stop()
+void SinkStreamManager::AddStream(const std::string &discovery_id)
 {
-    stream_.Destroy();
+    auto stream = new WebRtcStream();
+    SetWebRtcStreamCallbacks(*stream);
+    stream->Create(false, false);
+    stream->Start();
+
+    std::stringstream s_stream;
+    s_stream << static_cast<void *>(stream);
+
+    stream->SetStreamId(std::string(thread_id_ + s_stream.str()));
+    src_stream_[discovery_id] = stream;
 }
 
-void SinkStreamManager::HandleRemovedClient(const std::string &id)
+void SinkStreamManager::HandleStreamInfo(const std::string &discovery_id,
+      const std::vector<uint8_t> &message)
 {
-    if (id.compare(stream_.GetPeerId()) == 0) {
-        stream_.Destroy();
-        stream_.Create(false, false);
-        stream_.Start();
+    if (!WebRtcMessage::IsValidStreamInfo(message)) {
+        DBG("Invalid streams info");
+        return;
     }
 
-    return;
+    // sink_streams have a stream at normal situation
+    auto src_streams = flexbuffers::GetRoot(message).AsVector();
+    for (size_t stream_idx = 0; stream_idx < src_streams.size(); ++stream_idx) {
+        auto stream = src_streams[stream_idx].AsMap();
+        auto id = stream["id"].AsString().str();
+        auto peer_id = stream["peer_id"].AsString().str();
+        auto sdp = stream["sdp"].AsString().str();
+        std::vector<std::string> ice_candidates;
+        auto ice_info = stream["ice_candidates"].AsVector();
+        for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
+            ice_candidates.push_back(ice_info[ice_idx].AsString().str());
+        UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
+    }
 }
 
-void SinkStreamManager::HandleDiscoveredStream(const std::string &id,
-      const std::vector<uint8_t> &message)
+void SinkStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
+      const std::string &peer_id, const std::string &sdp,
+      const std::vector<std::string> &ice_candidates)
 {
-    auto info = WebRtcMessage::ParseDiscoveryMessage(message);
-    if (!info.is_src_ || info.topic_.compare(topic_)) {
-        DBG("Is sink or topic not matched");
+    auto src_stream = src_stream_.find(discovery_id);
+    if (src_stream == src_stream_.end()) {
+        DBG("No matching stream");
+        return;
+    }
+
+    if (peer_id.compare(src_stream->second->GetStreamId()) != 0) {
+        DBG("Different ID");
         return;
     }
 
-    // Sink'll update offer if WebRTC state is ice candidate compelete
-    // Src create answer after it receives the offer.
-    // So, We don't need to worry about state.
-    stream_.AddDiscoveryInformation(message);
+    if (src_stream->second->GetPeerId().size() == 0) {
+        DBG("first update");
+        src_stream->second->SetPeerId(id);
+        src_stream->second->AddPeerInformation(sdp, ice_candidates);
+    } else
+        src_stream->second->UpdatePeerInformation(ice_candidates);
+
+}
+
+std::vector<uint8_t> SinkStreamManager::GetDiscoveryMessage(void)
+{
+    std::vector<uint8_t> message;
+
+    flexbuffers::Builder fbb;
+    fbb.Vector([&] {
+        for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr) {
+            fbb.Map([&] {
+                fbb.String("id", itr->second->GetStreamId());
+                fbb.String("peer_id", itr->second->GetPeerId());
+                fbb.String("sdp", itr->second->GetLocalDescription());
+                fbb.Vector("ice_candidates", [&]() {
+                    for (const auto &candidate : itr->second->GetIceCandidates()) {
+                        fbb.String(candidate);
+                    }
+                });
+            });
+        }
+    });
+    fbb.Finish();
+
+    message = fbb.GetBuffer();
+    return message;
+}
+
+std::string SinkStreamManager::GetWatchingTopic(void)
+{
+    return watching_topic_;
 }
 
 }  // namespace AittWebRTCNamespace
index 863ec39c32077ff8bee8568051c3aa9ef579a300..1b27625462013f6aa5d82b78f67933932f086590 100644 (file)
@@ -16,6 +16,8 @@
 
 #pragma once
 
+#include <map>
+
 #include "StreamManager.h"
 
 namespace AittWebRTCNamespace {
@@ -25,28 +27,44 @@ class SinkStreamManager : public StreamManager {
     using EncodedFrameCallabck = std::function<void(WebRtcStream &stream)>;
     explicit SinkStreamManager(const std::string &topic, const std::string &aitt_id,
           const std::string &thread_id);
-    ~SinkStreamManager();
+    virtual ~SinkStreamManager();
     void Start(void) override;
     void Stop(void) override;
-    void HandleRemovedClient(const std::string &id) override;
-    void HandleDiscoveredStream(const std::string &id, const std::vector<uint8_t> &message) override;
+    void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) override;
     void SetStreamReadyCallback(StreamReadyCallback cb) override;
+    void SetStreamStartCallback(StreamStartCallback cb) override;
+    void SetStreamStopCallback(StreamStopCallback cb) override;
+    std::vector<uint8_t> GetDiscoveryMessage(void) override;
+    std::string GetWatchingTopic(void) override;
     // TODO: WebRTC CAPI doesn't allow destroy webrtc handle at callback.
     // We need to avoid that situation
     void SetOnEncodedFrameCallback(EncodedFrameCallabck cb);
-    WebRtcStream &GetStream(void) { return stream_; };
+    void HandleRemovedClient(const std::string &discovery_id) override;
+    void HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message) override;
 
   private:
     void SetWebRtcStreamCallbacks(WebRtcStream &stream) override;
-    static void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
-    static void OnOfferCreated(std::string sdp, WebRtcStream &stream);
-    static void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream,
-          SinkStreamManager *manager);
-    static void OnEncodedFrame(WebRtcStream &stream, SinkStreamManager *manager);
+    void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
+    void OnOfferCreated(std::string sdp, WebRtcStream &stream);
+    void OnIceCandidate(const std::string &candidate, WebRtcStream &stream);
+    void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream);
+    void OnEncodedFrame(WebRtcStream &stream);
+    void HandleStreamState(const std::string &discovery_id, const std::vector<uint8_t> &message);
+    void HandleStartStream(const std::string &discovery_id);
+    void HandleStreamInfo(const std::string &discovery_id, const std::vector<uint8_t> &message);
+    void AddStream(const std::string &discovery_id);
+    void UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
+          const std::string &peer_id, const std::string &sdp,
+          const std::vector<std::string> &ice_candidates);
+
+    std::string watching_topic_;
     // TODO: What if user copies the module?
     // Think about that case with destructor
-    WebRtcStream stream_;
+    std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> src_stream_;
+    IceCandidateAddedCallback ice_candidate_added_cb_;
     StreamReadyCallback stream_ready_cb_;
+    StreamStartCallback stream_start_cb_;
+    StreamStopCallback stream_stop_cb_;
     EncodedFrameCallabck encoded_frame_cb_;
 };
 }  // namespace AittWebRTCNamespace
index 41704435c0b28cd01daccd63a830e0f847fe43ee..be11c96a39b8c9d96ac2e681a427acfee547c4dd 100644 (file)
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 #include "SrcStreamManager.h"
 
+#include <flatbuffers/flexbuffers.h>
+
 #include <sstream>
 
 #include "aitt_internal.h"
@@ -23,13 +26,40 @@ namespace AittWebRTCNamespace {
 
 SrcStreamManager::SrcStreamManager(const std::string &topic, const std::string &aitt_id,
       const std::string &thread_id)
-      : StreamManager(topic, aitt_id, thread_id), stream_(nullptr), stream_ready_cb_(nullptr)
+      : StreamManager(topic + "/SRC", aitt_id, thread_id), watching_topic_(topic + "/SINK")
 {
 }
 
 SrcStreamManager::~SrcStreamManager()
 {
-    Stop();
+    // TODO: You should take care about stream resource
+    for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr)
+        itr->second->Destroy();
+    sink_streams_.clear();
+}
+
+void SrcStreamManager::Start(void)
+{
+    DBG("%s %s", __func__, GetTopic().c_str());
+    if (stream_start_cb_)
+        stream_start_cb_();
+}
+
+void SrcStreamManager::Stop(void)
+{
+    DBG("%s %s", __func__, GetTopic().c_str());
+    // TODO: You should take care about stream resource
+    for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr)
+        itr->second->Destroy();
+    sink_streams_.clear();
+
+    if (stream_stop_cb_)
+        stream_stop_cb_();
+}
+
+void SrcStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb)
+{
+    ice_candidate_added_cb_ = cb;
 }
 
 void SrcStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
@@ -37,18 +67,32 @@ void SrcStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
     stream_ready_cb_ = cb;
 }
 
+void SrcStreamManager::SetStreamStartCallback(StreamStartCallback cb)
+{
+    stream_start_cb_ = cb;
+}
+
+void SrcStreamManager::SetStreamStopCallback(StreamStopCallback cb)
+{
+    stream_stop_cb_ = cb;
+}
+
 void SrcStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
 {
-    auto on_stream_state_changed_cb =
-          std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream));
+    auto on_stream_state_changed_cb = std::bind(&SrcStreamManager::OnStreamStateChanged, this,
+          std::placeholders::_1, std::ref(stream));
     stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
 
-    auto on_signaling_state_notify_cb =
-          std::bind(OnSignalingStateNotify, std::placeholders::_1, std::ref(stream));
+    auto on_ice_candidate_added_cb = std::bind(&SrcStreamManager::OnIceCandidate, this,
+          std::placeholders::_1, std::ref(stream));
+    stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb);
+
+    auto on_signaling_state_notify_cb = std::bind(&SrcStreamManager::OnSignalingStateNotify, this,
+          std::placeholders::_1, std::ref(stream));
     stream.GetEventHandler().SetOnSignalingStateNotifyCb(on_signaling_state_notify_cb);
 
-    auto on_ice_gathering_state_changed_cb =
-          std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this);
+    auto on_ice_gathering_state_changed_cb = std::bind(&SrcStreamManager::OnIceGatheringStateNotify,
+          this, std::placeholders::_1, std::ref(stream));
     stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
 }
 
@@ -57,6 +101,16 @@ void SrcStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStr
     DBG("OnSrcStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
 }
 
+void SrcStreamManager::OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream)
+{
+    DBG("OnSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
+    if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
+        auto on_answer_created = std::bind(&SrcStreamManager::OnAnswerCreated, this,
+              std::placeholders::_1, std::ref(stream));
+        stream.CreateAnswerAsync(on_answer_created);
+    }
+}
+
 void SrcStreamManager::OnAnswerCreated(std::string sdp, WebRtcStream &stream)
 {
     DBG("%s", __func__);
@@ -64,85 +118,140 @@ void SrcStreamManager::OnAnswerCreated(std::string sdp, WebRtcStream &stream)
     stream.SetLocalDescription(sdp);
 }
 
-void SrcStreamManager::OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream)
+void SrcStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream)
 {
-    DBG("OnSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
-    if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
-        auto on_answer_created =
-              std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream));
-        stream.CreateAnswerAsync(on_answer_created);
-    }
+    if (ice_candidate_added_cb_)
+        ice_candidate_added_cb_(stream);
 }
 
 void SrcStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state,
-      WebRtcStream &stream, SrcStreamManager *manager)
+      WebRtcStream &stream)
 {
     DBG("Src IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
     if (state == WebRtcState::IceGathering::COMPLETE) {
-        if (manager && manager->stream_ready_cb_)
-            manager->stream_ready_cb_(stream);
+        if (stream_ready_cb_)
+            stream_ready_cb_(stream);
     }
 }
 
-void SrcStreamManager::Start(void)
+void SrcStreamManager::HandleRemovedClient(const std::string &discovery_id)
 {
-    // TODO: What'll be done in start Src Stream Manager?
-}
-
-void SrcStreamManager::AddStream(const std::string &id, const std::vector<uint8_t> &message)
-{
-    // TODO Add more streams on same topic
-    if (stream_)
+    auto sink_stream_itr = sink_streams_.find(discovery_id);
+    if (sink_stream_itr == sink_streams_.end()) {
+        DBG("There's no sink stream %s", discovery_id.c_str());
         return;
+    }
 
-    stream_ = new WebRtcStream();
-    SetWebRtcStreamCallbacks(*stream_);
-    stream_->Create(true, false);
-    stream_->AttachCameraSource();
-    stream_->Start();
-
-    std::stringstream s_stream;
-    s_stream << static_cast<void *>(&stream_);
-
-    stream_->SetStreamId(std::string(aitt_id_ + thread_id_ + s_stream.str()));
-    stream_->SetPeerId(id);
-    stream_->AddDiscoveryInformation(message);
+    // TODO: You should take care about stream resource
+    sink_stream_itr->second->Destroy();
+    sink_streams_.erase(sink_stream_itr);
 
     return;
 }
 
-void SrcStreamManager::Stop(void)
+void SrcStreamManager::HandleMsg(const std::string &discovery_id,
+      const std::vector<uint8_t> &message)
 {
-    // TODO: Ad-hoc method
-    if (stream_) {
-        delete stream_;
-        stream_ = nullptr;
-    }
+    if (flexbuffers::GetRoot(message).IsString())
+        HandleStreamState(discovery_id, message);
+    else if (flexbuffers::GetRoot(message).IsVector())
+        HandleStreamInfo(discovery_id, message);
 }
 
-void SrcStreamManager::HandleRemovedClient(const std::string &id)
+void SrcStreamManager::HandleStreamState(const std::string &discovery_id,
+      const std::vector<uint8_t> &message)
 {
-    if (stream_ && stream_->GetPeerId().compare(id) == 0) {
-        delete stream_;
-        stream_ = nullptr;
-    }
+    auto sink_state = flexbuffers::GetRoot(message).ToString();
+
+    if (sink_state.compare("STOP") == 0)
+        HandleRemovedClient(discovery_id);
+    else
+        DBG("Invalid message %s", sink_state);
 }
 
-void SrcStreamManager::HandleDiscoveredStream(const std::string &id,
+void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id,
       const std::vector<uint8_t> &message)
 {
-    auto info = WebRtcMessage::ParseDiscoveryMessage(message);
-    if (info.is_src_ || info.topic_.compare(topic_)) {
-        DBG("Is src or topic not matched");
+    if (!WebRtcMessage::IsValidStreamInfo(message)) {
+        DBG("Invalid streams info");
         return;
     }
 
-    if (stream_) {
-        DBG("Supports one stream at once currently");
-        return;
+    // sink_streams have a stream at normal situation
+    auto sink_streams = flexbuffers::GetRoot(message).AsVector();
+    for (size_t stream_idx = 0; stream_idx < sink_streams.size(); ++stream_idx) {
+        auto stream = sink_streams[stream_idx].AsMap();
+        auto id = stream["id"].AsString().str();
+        auto peer_id = stream["peer_id"].AsString().str();
+        auto sdp = stream["sdp"].AsString().str();
+        std::vector<std::string> ice_candidates;
+        auto ice_info = stream["ice_candidates"].AsVector();
+        for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
+            ice_candidates.push_back(ice_info[ice_idx].AsString().str());
+        UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
     }
+}
+
+void SrcStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
+      const std::string &peer_id, const std::string &sdp,
+      const std::vector<std::string> &ice_candidates)
+{
+    auto sink_stream = sink_streams_.find(discovery_id);
+    if (sink_stream == sink_streams_.end())
+        AddStream(discovery_id, id, sdp, ice_candidates);
+    else
+        sink_stream->second->UpdatePeerInformation(ice_candidates);
+}
 
-    AddStream(id, message);
+void SrcStreamManager::AddStream(const std::string &discovery_id, const std::string &id,
+      const std::string &sdp, const std::vector<std::string> &ice_candidates)
+{
+    auto stream = new WebRtcStream();
+    SetWebRtcStreamCallbacks(*stream);
+    stream->Create(true, false);
+    stream->AttachCameraSource();
+    stream->Start();
+
+    std::stringstream s_stream;
+    s_stream << static_cast<void *>(stream);
+
+    stream->SetStreamId(std::string(thread_id_ + s_stream.str()));
+    stream->SetPeerId(id);
+    stream->AddPeerInformation(sdp, ice_candidates);
+
+    sink_streams_[discovery_id] = stream;
+
+    return;
+}
+
+std::vector<uint8_t> SrcStreamManager::GetDiscoveryMessage(void)
+{
+    std::vector<uint8_t> message;
+
+    flexbuffers::Builder fbb;
+    fbb.Vector([&] {
+        for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr) {
+            fbb.Map([&] {
+                fbb.String("id", itr->second->GetStreamId());
+                fbb.String("peer_id", itr->second->GetPeerId());
+                fbb.String("sdp", itr->second->GetLocalDescription());
+                fbb.Vector("ice_candidates", [&]() {
+                    for (const auto &candidate : itr->second->GetIceCandidates()) {
+                        fbb.String(candidate);
+                    }
+                });
+            });
+        }
+    });
+    fbb.Finish();
+
+    message = fbb.GetBuffer();
+    return message;
+}
+
+std::string SrcStreamManager::GetWatchingTopic(void)
+{
+    return watching_topic_;
 }
 
 }  // namespace AittWebRTCNamespace
index 334e23385f49acea4e2faf9a0667571e9a24ab79..aca572a3fb4f160ef126afa7497992d56a3a2947 100644 (file)
@@ -16,6 +16,8 @@
 
 #pragma once
 
+#include <map>
+
 #include "StreamManager.h"
 
 namespace AittWebRTCNamespace {
@@ -24,25 +26,41 @@ class SrcStreamManager : public StreamManager {
   public:
     explicit SrcStreamManager(const std::string &topic, const std::string &aitt_id,
           const std::string &thread_id);
-    ~SrcStreamManager();
+    virtual ~SrcStreamManager();
     void Start(void) override;
-    void SetStreamReadyCallback(StreamReadyCallback cb) override;
-    //TODO: What's the best way to shutdown all?
+    // TODO: What's the best way to shutdown all?
     void Stop(void) override;
-    void HandleRemovedClient(const std::string &id) override;
-    void HandleDiscoveredStream(const std::string &id, const std::vector<uint8_t> &message) override;
+    void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) override;
+    void SetStreamReadyCallback(StreamReadyCallback cb) override;
+    void SetStreamStartCallback(StreamStartCallback cb) override;
+    void SetStreamStopCallback(StreamStopCallback cb) override;
+    std::vector<uint8_t> GetDiscoveryMessage(void) override;
+    std::string GetWatchingTopic(void) override;
+    void HandleRemovedClient(const std::string &discovery_id) override;
+    void HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message) override;
 
   private:
     void SetWebRtcStreamCallbacks(WebRtcStream &stream) override;
-    void AddStream(const std::string &id, const std::vector<uint8_t> &message);
-    static void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
-    static void OnAnswerCreated(std::string sdp, WebRtcStream &stream);
-    static void OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream);
-    static void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream,
-          SrcStreamManager *manager);
+    void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
+    void OnAnswerCreated(std::string sdp, WebRtcStream &stream);
+    void OnIceCandidate(const std::string &candidate, WebRtcStream &stream);
+    void OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream);
+    void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream);
+    void HandleStreamState(const std::string &discovery_id, const std::vector<uint8_t> &message);
+    void HandleStreamInfo(const std::string &discovery_id, const std::vector<uint8_t> &message);
+    void AddStream(const std::string &discovery_id, const std::string &id, const std::string &sdp,
+          const std::vector<std::string> &ice_candidates);
+    void UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
+          const std::string &peer_id, const std::string &sdp,
+          const std::vector<std::string> &ice_candidates);
+
+    std::string watching_topic_;
     // TODO: What if user copies the module?
     // Think about that case with destructor
-    WebRtcStream *stream_;
+    std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> sink_streams_;
+    IceCandidateAddedCallback ice_candidate_added_cb_;
     StreamReadyCallback stream_ready_cb_;
+    StreamStartCallback stream_start_cb_;
+    StreamStopCallback stream_stop_cb_;
 };
 }  // namespace AittWebRTCNamespace
index 430da607957e81f4bb4a4c77ba419ace7da33827..a613a4acca394b18232809bb96ecd76f3e2507fb 100644 (file)
 #include <functional>
 #include <string>
 #include <thread>
+#include <vector>
 
 namespace AittWebRTCNamespace {
 
 class StreamManager {
   public:
+    using IceCandidateAddedCallback = std::function<void(WebRtcStream &stream)>;
     using StreamReadyCallback = std::function<void(WebRtcStream &stream)>;
+    using StreamStartCallback = std::function<void(void)>;
+    using StreamStopCallback = std::function<void(void)>;
     explicit StreamManager(const std::string &topic, const std::string &aitt_id,
           const std::string &thread_id)
           : topic_(topic), aitt_id_(aitt_id), thread_id_(thread_id){};
     virtual ~StreamManager() = default;
     std::string GetTopic(void) const { return topic_; };
     std::string GetClientId(void) const { return aitt_id_; };
-    virtual void HandleRemovedClient(const std::string &id) = 0;
-    virtual void HandleDiscoveredStream(const std::string &id,
+    virtual void HandleRemovedClient(const std::string &discovery_id) = 0;
+    virtual void HandleMsg(const std::string &discovery_id,
           const std::vector<uint8_t> &message) = 0;
+
     virtual void Start(void) = 0;
     virtual void Stop(void) = 0;
+    virtual void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) = 0;
     virtual void SetStreamReadyCallback(StreamReadyCallback cb) = 0;
+    virtual void SetStreamStartCallback(StreamStartCallback cb) = 0;
+    virtual void SetStreamStopCallback(StreamStopCallback cb) = 0;
+    virtual std::vector<uint8_t> GetDiscoveryMessage(void) = 0;
+    virtual std::string GetWatchingTopic(void) = 0;
 
   protected:
     std::string topic_;
index 0da5930a7f714ffd366298576b850a4137f7ae6c..308171d20b75d39adbcb8d29c8cfad0cdabb744c 100644 (file)
@@ -48,6 +48,18 @@ class WebRtcEventHandler {
     };
     void UnsetOnSignalingStateNotifyCb(void) { on_signaling_state_notify_cb_ = nullptr; };
 
+    void SetOnIceCandidateCb(
+          std::function<void(std::string)> on_ice_candidate_cb)
+    {
+        on_ice_candidate_cb_ = on_ice_candidate_cb;
+    };
+    void CallOnIceCandidateCb(std::string candidate) const
+    {
+        if (on_ice_candidate_cb_)
+            on_ice_candidate_cb_(candidate);
+    };
+    void UnsetOnIceCandidateCb(void) { on_ice_candidate_cb_ = nullptr; };
+
     void SetOnIceGatheringStateNotifyCb(
           std::function<void(WebRtcState::IceGathering)> on_ice_gathering_state_notify_cb)
     {
@@ -98,6 +110,7 @@ class WebRtcEventHandler {
     std::function<void(void)> on_negotiation_needed_cb_;
     std::function<void(WebRtcState::Stream)> on_state_changed_cb_;
     std::function<void(WebRtcState::Signaling)> on_signaling_state_notify_cb_;
+    std::function<void(std::string)> on_ice_candidate_cb_;
     std::function<void(WebRtcState::IceGathering)> on_ice_gathering_state_notify_cb_;
     std::function<void(WebRtcState::IceConnection)> on_ice_connection_state_notify_cb_;
     std::function<void(void)> on_encoded_frame_cb_;
index 2756ecdee3eb3452df9c6a01dfbf67d657ec49a3..422bd55343805e76ba2ac769e3f87a7dbaf7d872 100644 (file)
@@ -16,7 +16,6 @@
 
 #include "WebRtcMessage.h"
 
-#include <flatbuffers/flexbuffers.h>
 #include <json-glib/json-glib.h>
 
 #include "aitt_internal.h"
@@ -55,62 +54,29 @@ WebRtcMessage::Type WebRtcMessage::getMessageType(const std::string &message)
     return type;
 }
 
-std::vector<uint8_t> WebRtcMessage::GenerateDiscoveryMessage(const std::string &topic, bool is_src,
-      const std::string &sdp, const std::vector<std::string> &ice_candidates)
+bool WebRtcMessage::IsValidStreamInfo(const std::vector<uint8_t> &message)
 {
-    std::vector<uint8_t> message;
-
-    flexbuffers::Builder fbb;
-    fbb.Map([=, &fbb]() {
-        fbb.String("topic", topic);
-        fbb.Bool("is_src", is_src);
-        fbb.String("sdp", sdp);
-        fbb.Vector("ice_candidates", [&]() {
-            for (const auto &candidate : ice_candidates) {
-                fbb.String(candidate);
-            }
-        });
-    });
-    fbb.Finish();
-
-    message = fbb.GetBuffer();
-
-    return message;
-}
-
-bool WebRtcMessage::IsValidDiscoveryMessage(const std::vector<uint8_t> &discovery_message)
-{
-    if (!flexbuffers::GetRoot(discovery_message).IsMap())
+    if (!flexbuffers::GetRoot(message).IsVector())
         return false;
 
-    auto discovery_info = flexbuffers::GetRoot(discovery_message).AsMap();
-    bool topic_exists{discovery_info["topic"].IsString()};
-    bool is_source_exists{discovery_info["is_src"].IsBool()};
-    bool sdp_exists{discovery_info["sdp"].IsString()};
-    bool ice_candidates_exists{discovery_info["ice_candidates"].IsVector()};
+    auto streams = flexbuffers::GetRoot(message).AsVector();
+    for (size_t idx = 0; idx < streams.size(); ++idx) {
+        auto stream = streams[idx].AsMap();
+        if (!IsValidStream(stream))
+            return false;
+    }
 
-    return topic_exists && is_source_exists && sdp_exists && ice_candidates_exists;
+    return true;
 }
 
-WebRtcMessage::DiscoveryInfo WebRtcMessage::ParseDiscoveryMessage(
-      const std::vector<uint8_t> &discovery_message)
+bool WebRtcMessage::IsValidStream(const flexbuffers::Map &info)
 {
-    WebRtcMessage::DiscoveryInfo info;
-    if (!IsValidDiscoveryMessage(discovery_message)) {
-        DBG("Invalid info");
-        return info;
-    }
-
-    auto discovery_info_map = flexbuffers::GetRoot(discovery_message).AsMap();
-    info.topic_ = discovery_info_map["topic"].AsString().str();
-    info.is_src_ = discovery_info_map["is_src"].AsBool();
-    info.sdp_ = discovery_info_map["sdp"].AsString().str();
-    auto ice_candidates_info = discovery_info_map["ice_candidates"].AsVector();
-    for (size_t idx = 0; idx < ice_candidates_info.size(); ++idx) {
-        info.ice_candidates_.push_back(ice_candidates_info[idx].AsString().str());
-    }
+    bool id_exist{info["id"].IsString()};
+    bool peer_id_exist{info["peer_id"].IsString()};
+    bool sdp_exist{info["sdp"].IsString()};
+    bool ice_candidates_exist{info["ice_candidates"].IsVector()};
 
-    return info;
+    return id_exist && peer_id_exist && sdp_exist && ice_candidates_exist;
 }
 
 }  // namespace AittWebRTCNamespace
index ac75f07735e46c42654e4b1b53d05cb028417d19..cca0a70423263aa2a873c0e3ee706b564a69aee3 100644 (file)
@@ -16,6 +16,8 @@
 
 #pragma once
 
+#include <flatbuffers/flexbuffers.h>
+
 #include <string>
 #include <vector>
 
@@ -31,21 +33,19 @@ class WebRtcMessage {
     class DiscoveryInfo {
       public:
         DiscoveryInfo() = default;
-        DiscoveryInfo(const std::string &topic, bool is_src, const std::string &sdp,
+        DiscoveryInfo(const std::string &id, const std::string &peer_id, const std::string &sdp,
               const std::vector<std::string> &ice_candidates)
-              : topic_(topic), is_src_(is_src), sdp_(sdp), ice_candidates_(ice_candidates)
+              : id_(id), peer_id_(peer_id), sdp_(sdp), ice_candidates_(ice_candidates)
         {
         }
-        std::string topic_;
-        bool is_src_;
+        std::string id_;
+        std::string peer_id_;
         std::string sdp_;
         std::vector<std::string> ice_candidates_;
     };
     static WebRtcMessage::Type getMessageType(const std::string &message);
-    static std::vector<uint8_t> GenerateDiscoveryMessage(const std::string &topic, bool is_src,
-          const std::string &sdp, const std::vector<std::string> &ice_candidates);
-    static bool IsValidDiscoveryMessage(const std::vector<uint8_t> &discovery_message);
-    static DiscoveryInfo ParseDiscoveryMessage(const std::vector<uint8_t> &discovery_message);
+    static bool IsValidStreamInfo(const std::vector<uint8_t> &message);
+    static bool IsValidStream(const flexbuffers::Map &info);
     static constexpr int DISCOVERY_MESSAGE_KEY_SIZE = 4;
 };
 
index 318e17458d1ca32da6aa500699b7c4ff908371fb..0e1662aab7069522c49e9e524d0581a29267ed79 100644 (file)
@@ -79,6 +79,8 @@ void WebRtcStream::Destroy(void)
     if (ret != WEBRTC_ERROR_NONE)
         ERR("Failed to destroy webrtc handle");
     webrtc_handle_ = nullptr;
+
+    // TODO what should be initialized?
 }
 
 bool WebRtcStream::Start(void)
@@ -106,6 +108,7 @@ bool WebRtcStream::Stop(void)
     if (ret != WEBRTC_ERROR_NONE)
         ERR("Failed to stop webrtc handle");
 
+    // TODO what should be initialized?
     return ret == WEBRTC_ERROR_NONE;
 }
 
@@ -240,18 +243,38 @@ bool WebRtcStream::AddIceCandidateFromMessage(const std::string &ice_message)
     return ret == WEBRTC_ERROR_NONE;
 }
 
-bool WebRtcStream::AddDiscoveryInformation(const std::vector<uint8_t> &discovery_message)
+bool WebRtcStream::AddPeerInformation(const std::string &sdp,
+      const std::vector<std::string> &ice_candidates)
 {
-    ERR("%s", __func__);
-
-    peer_info_ = WebRtcMessage::ParseDiscoveryMessage(discovery_message);
+    remote_description_ = sdp;
+    peer_ice_candidates_ = ice_candidates;
     if (!IsNegotiatingState()) {
+        DBG("Not negotiable");
         return false;
     }
-
     return SetPeerInformation();
 }
 
+void WebRtcStream::UpdatePeerInformation(const std::vector<std::string> &ice_candidates)
+{
+    if (IsPlayingState()) {
+        remote_description_.clear();
+        peer_ice_candidates_.clear();
+        stored_peer_ice_candidates_.clear();
+        DBG("Now Playing");
+        return;
+    }
+
+    peer_ice_candidates_ = ice_candidates;
+    if (!IsNegotiatingState()) {
+        DBG("Not negotiable");
+        return;
+    }
+
+    SetPeerInformation();
+    return;
+}
+
 bool WebRtcStream::IsNegotiatingState(void)
 {
     webrtc_state_e state;
@@ -264,6 +287,18 @@ bool WebRtcStream::IsNegotiatingState(void)
     return state == WEBRTC_STATE_NEGOTIATING;
 }
 
+bool WebRtcStream::IsPlayingState(void)
+{
+    webrtc_state_e state;
+    auto get_state_ret = webrtc_get_state(webrtc_handle_, &state);
+    if (get_state_ret != WEBRTC_ERROR_NONE) {
+        ERR("Failed to get state");
+        return false;
+    }
+
+    return state == WEBRTC_STATE_PLAYING;
+}
+
 bool WebRtcStream::SetPeerInformation(void)
 {
     bool res = true;
@@ -275,12 +310,15 @@ bool WebRtcStream::SetPeerInformation(void)
 }
 bool WebRtcStream::SetPeerSDP(void)
 {
+    ERR("%s", __func__);
     bool res = true;
-    if (peer_info_.sdp_.size() == 0)
+    if (remote_description_.size() == 0) {
+        DBG("Peer SDP empty");
         return res;
+    }
 
-    if (SetRemoteDescription(peer_info_.sdp_))
-        peer_info_.sdp_.clear();
+    if (SetRemoteDescription(remote_description_))
+        remote_description_.clear();
     else
         res = false;
 
@@ -289,10 +327,14 @@ bool WebRtcStream::SetPeerSDP(void)
 
 bool WebRtcStream::SetPeerIceCandidates(void)
 {
-    bool res = true;
-    for (auto it = peer_info_.ice_candidates_.begin(); it != peer_info_.ice_candidates_.end();) {
-        if (AddIceCandidateFromMessage(*it))
-            it = peer_info_.ice_candidates_.erase(it);
+    ERR("%s", __func__);
+    bool res{true};
+    for (auto itr = peer_ice_candidates_.begin(); itr != peer_ice_candidates_.end(); ++itr) {
+        if (IsRedundantCandidate(*itr))
+            continue;
+
+        if (AddIceCandidateFromMessage(*itr))
+            stored_peer_ice_candidates_.push_back(*itr);
         else
             res = false;
     }
@@ -300,6 +342,14 @@ bool WebRtcStream::SetPeerIceCandidates(void)
     return res;
 }
 
+bool WebRtcStream::IsRedundantCandidate(const std::string &candidate)
+{
+    for (const auto &stored_ice_candidate : stored_peer_ice_candidates_) {
+        if (stored_ice_candidate.compare(candidate) == 0)
+            return true;
+    }
+    return false;
+}
 
 void WebRtcStream::AttachSignals(bool is_source, bool need_display)
 {
@@ -422,6 +472,7 @@ void WebRtcStream::OnIceCandiate(webrtc_h webrtc, const char *candidate, void *u
     ERR("%s", __func__);
     auto webrtc_stream = static_cast<WebRtcStream *>(user_data);
     webrtc_stream->ice_candidates_.push_back(candidate);
+    webrtc_stream->GetEventHandler().CallOnIceCandidateCb(candidate);
 }
 
 void WebRtcStream::OnEncodedFrame(webrtc_h webrtc, webrtc_media_type_e type, unsigned int track_id,
index 793c3e13db205ebe0c81d4b54dbec6a080d355b3..9b4c7031fdc79054e0ef70cd25ffbfea1381aa2c 100644 (file)
@@ -73,7 +73,8 @@ class WebRtcStream {
     std::string &GetPeerId(void) { return peer_id_; };
 
     bool AddIceCandidateFromMessage(const std::string &ice_message);
-    bool AddDiscoveryInformation(const std::vector<uint8_t> &discovery_message);
+    bool AddPeerInformation(const std::string &sdp, const std::vector<std::string> &ice_candidates);
+    void UpdatePeerInformation(const std::vector<std::string> &ice_candidates);
     bool SetPeerInformation(void);
     bool SetPeerSDP(void);
     bool SetPeerIceCandidates(void);
@@ -101,18 +102,22 @@ class WebRtcStream {
           void *user_data);
     static void OnDataChannelOpen(webrtc_data_channel_h channel, void *user_data);
     bool IsNegotiatingState(void);
+    bool IsPlayingState(void);
+    bool IsRedundantCandidate(const std::string &candidate);
 
   private:
     webrtc_h webrtc_handle_;
     webrtc_data_channel_h channel_;
     unsigned int source_id_;
     std::string local_description_;
+    std::string remote_description_;
     std::string id_;
     std::string peer_id_;
     std::vector<std::string> ice_candidates_;
+    std::vector<std::string> peer_ice_candidates_;
+    std::vector<std::string> stored_peer_ice_candidates_;
     std::function<void(std::string)> on_offer_created_cb_;
     std::function<void(std::string)> on_answer_created_cb_;
     WebRtcEventHandler event_handler_;
-    WebRtcMessage::DiscoveryInfo peer_info_;
 };
 }  // namespace AittWebRTCNamespace
index 08960c992087385a27673baf02c166a2c203c01a..f983d11a064d09fdb2f9896930c461f0ddbfb9ca 100644 (file)
@@ -15,6 +15,7 @@
  */
 
 #include <AittDiscovery.h>
+#include <flatbuffers/flexbuffers.h>
 #include <glib.h>
 #include <gtest/gtest.h>
 
@@ -30,6 +31,8 @@
 #define LOCAL_IP "127.0.0.1"
 #define DEFAULT_BROKER_PORT 1883
 #define WEBRTC_TOPIC_PREFIX "WEBRTC_"
+#define WEBRTC_SRC_TOPIC "/src"
+#define WEBRTC_SINK_TOPIC "/sink"
 #define TEST_TOPIC "TEST_TOPIC"
 #define TEST_SRC_CLIENT_ID "TEST_SRC_CLIENT_ID"
 #define TEST_SINK_CLIENT_ID "TEST_SINK_CLIENT_ID"
@@ -62,7 +65,7 @@ TEST_F(WebRtcMessageTest, test_OnDevice)
 {
     // TODO
 }
-class WebRtcSrcStreamTest : public testing::Test {
+class WebRtcStreamTest : public testing::Test {
   protected:
     void SetUp() override { mainLoop_ = g_main_loop_new(nullptr, FALSE); }
     void TearDown() override { g_main_loop_unref(mainLoop_); }
@@ -71,8 +74,9 @@ class WebRtcSrcStreamTest : public testing::Test {
         g_main_loop_run(mainLoop_);
         DBG("Go forward");
     }
+
     static void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream,
-          WebRtcSrcStreamTest *test)
+          WebRtcStreamTest *test)
     {
         DBG("OnStreamStateChanged");
         if (state == WebRtcState::Stream::NEGOTIATING) {
@@ -81,7 +85,7 @@ class WebRtcSrcStreamTest : public testing::Test {
             stream.CreateOfferAsync(on_offer_created);
         }
     }
-    static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcSrcStreamTest *test)
+    static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcStreamTest *test)
     {
         DBG("%s", __func__);
 
@@ -89,14 +93,8 @@ class WebRtcSrcStreamTest : public testing::Test {
         stream.SetLocalDescription(sdp);
     }
 
-    static void OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream,
-          WebRtcSrcStreamTest *test)
-    {
-        DBG("Singaling State: %s", WebRtcState::SignalingToStr(state).c_str());
-    }
-
     static void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream,
-          WebRtcSrcStreamTest *test)
+          WebRtcStreamTest *test)
     {
         DBG("IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
         g_main_loop_quit(test->mainLoop_);
@@ -105,33 +103,13 @@ class WebRtcSrcStreamTest : public testing::Test {
     std::string local_description_;
 };
 
-TEST_F(WebRtcSrcStreamTest, test_Create_WebRtcSrcStream_OnDevice)
-{
-    WebRtcStream stream{};
-    EXPECT_EQ(true, stream.Create(true, false)) << "Failed to create source stream";
-}
-
-TEST_F(WebRtcSrcStreamTest, test_Start_WebRtcSrcStream_OnDevice)
+TEST_F(WebRtcStreamTest, test_Create_WebRtcStream_OnDevice)
 {
-    WebRtcStream stream{};
-    EXPECT_EQ(true, stream.Create(true, false)) << "Failed to create source stream";
-    EXPECT_EQ(true, stream.AttachCameraSource()) << "Failed to attach camera source";
-    auto on_stream_state_changed_cb =
-          std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream), this);
-    stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
-
-    auto on_signaling_state_changed_cb =
-          std::bind(OnSignalingStateNotify, std::placeholders::_1, std::ref(stream), this);
-    stream.GetEventHandler().SetOnSignalingStateNotifyCb(on_signaling_state_changed_cb);
-
-    auto on_ice_gathering_state_changed_cb =
-          std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this);
-    stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
-    stream.Start();
-    IterateEventLoop();
+    WebRtcStream src_stream{};
+    EXPECT_EQ(true, src_stream.Create(true, false)) << "Failed to create source stream";
 }
 
-TEST_F(WebRtcSrcStreamTest, test_Validate_WebRtcSrcStream_Discovery_Message_OnDevice)
+TEST_F(WebRtcStreamTest, test_Start_WebRtcSrcStream_OnDevice)
 {
     WebRtcStream stream{};
     EXPECT_EQ(true, stream.Create(true, false)) << "Failed to create source stream";
@@ -140,10 +118,6 @@ TEST_F(WebRtcSrcStreamTest, test_Validate_WebRtcSrcStream_Discovery_Message_OnDe
           std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream), this);
     stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
 
-    auto on_signaling_state_changed_cb =
-          std::bind(OnSignalingStateNotify, std::placeholders::_1, std::ref(stream), this);
-    stream.GetEventHandler().SetOnSignalingStateNotifyCb(on_signaling_state_changed_cb);
-
     auto on_ice_gathering_state_changed_cb =
           std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this);
     stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
@@ -154,24 +128,6 @@ TEST_F(WebRtcSrcStreamTest, test_Validate_WebRtcSrcStream_Discovery_Message_OnDe
     for (const auto &ice_candidate : ice_candidates) {
         EXPECT_EQ(WebRtcMessage::Type::ICE, WebRtcMessage::getMessageType(ice_candidate));
     }
-
-    auto discovery_message = WebRtcMessage::GenerateDiscoveryMessage(TEST_TOPIC, true,
-          local_description_, stream.GetIceCandidates());
-
-    auto discovery_info = WebRtcMessage::ParseDiscoveryMessage(discovery_message);
-
-    EXPECT_EQ(0, discovery_info.topic_.compare(TEST_TOPIC));
-    EXPECT_EQ(true, discovery_info.is_src_);
-    EXPECT_EQ(0, discovery_info.sdp_.compare(local_description_));
-    for (const auto ice_candidate : ice_candidates) {
-        bool is_ice_candidate_exists{false};
-
-        for (const auto &discovered_ice_candidate : discovery_info.ice_candidates_) {
-            if (discovered_ice_candidate.compare(ice_candidate) == 0)
-                is_ice_candidate_exists = true;
-        }
-        EXPECT_EQ(true, is_ice_candidate_exists);
-    }
 }
 
 class WebRtcSourceOffererTest : public testing::Test {
@@ -214,8 +170,8 @@ class WebRtcSourceOffererTest : public testing::Test {
     {
         DBG("Source IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
         if (state == WebRtcState::IceGathering::COMPLETE) {
-            test->sink_stream_.AddDiscoveryInformation(WebRtcMessage::GenerateDiscoveryMessage(
-                  "TEST_TOPIC", true, test->src_description_, stream.GetIceCandidates()));
+            test->sink_stream_.AddPeerInformation(test->src_description_,
+                  stream.GetIceCandidates());
         }
     }
 
@@ -250,8 +206,8 @@ class WebRtcSourceOffererTest : public testing::Test {
     {
         DBG("Sink IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
         if (WebRtcState::IceGathering::COMPLETE == state) {
-            test->src_stream_.AddDiscoveryInformation(WebRtcMessage::GenerateDiscoveryMessage(
-                  "TEST_TOPIC", true, test->sink_description_, stream.GetIceCandidates()));
+            test->src_stream_.AddPeerInformation(test->sink_description_,
+                  stream.GetIceCandidates());
         }
     }
 
@@ -347,8 +303,8 @@ class WebRtcSinkOffererTest : public testing::Test {
     {
         DBG("Sink IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
         if (state == WebRtcState::IceGathering::COMPLETE) {
-            test->src_stream_.AddDiscoveryInformation(WebRtcMessage::GenerateDiscoveryMessage(
-                  "TEST_TOPIC", true, test->sink_description_, stream.GetIceCandidates()));
+            test->src_stream_.AddPeerInformation(test->sink_description_,
+                  stream.GetIceCandidates());
         }
     }
 
@@ -382,8 +338,8 @@ class WebRtcSinkOffererTest : public testing::Test {
     {
         DBG("Src IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
         if (WebRtcState::IceGathering::COMPLETE == state) {
-            test->sink_stream_.AddDiscoveryInformation(WebRtcMessage::GenerateDiscoveryMessage(
-                  "TEST_TOPIC", true, test->src_description_, stream.GetIceCandidates()));
+            test->sink_stream_.AddPeerInformation(test->src_description_,
+                  stream.GetIceCandidates());
         }
     }
 
@@ -456,13 +412,123 @@ TEST_F(StreamManagerTest, test_Create_StreamManager_Anytime)
           new SrcStreamManager(TEST_TOPIC, TEST_SRC_CLIENT_ID, s_stream.str()));
 }
 
+class SrcStreamManagerTest : public testing::Test {
+  protected:
+    SrcStreamManagerTest()
+          : discovery_cb_(-1),
+            test_topic_(std::string(WEBRTC_TOPIC_PREFIX) + std::string(TEST_TOPIC)),
+            discovery_engine_(TEST_SRC_CLIENT_ID),
+            mainLoop_(nullptr){};
+    void SetUp() override
+    {
+        mainLoop_ = g_main_loop_new(nullptr, FALSE);
+        discovery_engine_.SetMQ(std::unique_ptr<aitt::MQ>(
+              new aitt::MosquittoMQ(std::string(TEST_SRC_CLIENT_ID) + 'd', false)));
+        discovery_engine_.Start(LOCAL_IP, DEFAULT_BROKER_PORT, std::string(), std::string());
+    }
+
+    void TearDown() override
+    {
+        discovery_engine_.SetMQ(nullptr);
+        g_main_loop_unref(mainLoop_);
+    }
+    void IterateEventLoop(void)
+    {
+        g_main_loop_run(mainLoop_);
+        DBG("Go forward");
+    }
+
+    static void OnDiscoveredSrc(const std::string &clientId, const std::string &status,
+          const void *msg, const int szmsg, SrcStreamManagerTest *test,
+          SrcStreamManager *src_manager)
+    {
+        DBG("OnDiscoveredSrc");
+
+        auto discovered_msg = std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
+              static_cast<const uint8_t *>(msg) + szmsg);
+        if (!flexbuffers::GetRoot(discovered_msg).IsString()) {
+            DBG("Invalid message type");
+            return;
+        }
+
+        auto src_cmd = flexbuffers::GetRoot(discovered_msg).ToString();
+        if (src_cmd.compare("START") == 0) {
+            DBG("Start Received");
+            src_manager->Stop();
+        } else if (src_cmd.compare("STOP") == 0) {
+            DBG("Stop Received");
+            if (g_main_loop_is_running(test->mainLoop_))
+                g_main_loop_quit(test->mainLoop_);
+        } else {
+            DBG("Invalid message");
+            return;
+        }
+    }
+
+    static void OnStreamStarted(SrcStreamManagerTest *test, SrcStreamManager *src_manager)
+    {
+        std::vector<uint8_t> msg;
+
+        flexbuffers::Builder fbb;
+        fbb.String("START");
+        fbb.Finish();
+
+        msg = fbb.GetBuffer();
+        test->discovery_engine_.UpdateDiscoveryMsg(src_manager->GetTopic(), msg.data(), msg.size());
+    }
+
+    static void OnStreamStopped(SrcStreamManagerTest *test, SrcStreamManager *src_manager)
+    {
+        std::vector<uint8_t> msg;
+
+        flexbuffers::Builder fbb;
+        fbb.String("STOP");
+        fbb.Finish();
+
+        msg = fbb.GetBuffer();
+        test->discovery_engine_.UpdateDiscoveryMsg(src_manager->GetTopic(), msg.data(), msg.size());
+    }
+
+    int discovery_cb_;
+    std::string test_topic_;
+    aitt::AittDiscovery discovery_engine_;
+    GMainLoop *mainLoop_;
+};
+
+TEST_F(SrcStreamManagerTest, test_SrcStreamManager_Start_OnDevice)
+{
+    std::stringstream s_stream;
+    s_stream << std::this_thread::get_id();
+    std::unique_ptr<StreamManager> src_manager(
+          new SrcStreamManager(test_topic_, TEST_SRC_CLIENT_ID, s_stream.str()));
+
+    auto on_discovered = std::bind(OnDiscoveredSrc, std::placeholders::_1, std::placeholders::_2,
+          std::placeholders::_3, std::placeholders::_4, this,
+          static_cast<SrcStreamManager *>(src_manager.get()));
+
+    // Src should subscribe WEBRTC_SINK_TOPIC but this is for test
+    discovery_cb_ = discovery_engine_.AddDiscoveryCB(src_manager->GetTopic(), on_discovered);
+
+    auto on_stream_started =
+          std::bind(OnStreamStarted, this, static_cast<SrcStreamManager *>(src_manager.get()));
+    src_manager->SetStreamStartCallback(on_stream_started);
+
+    auto on_stream_stopped =
+          std::bind(OnStreamStopped, this, static_cast<SrcStreamManager *>(src_manager.get()));
+    src_manager->SetStreamStopCallback(on_stream_stopped);
+
+    src_manager->Start();
+    IterateEventLoop();
+    discovery_engine_.RemoveDiscoveryCB(discovery_cb_);
+}
+
 class SinkStreamManagerTest : public testing::Test {
   protected:
     SinkStreamManagerTest()
           : discovery_cb_(-1),
             test_topic_(std::string(WEBRTC_TOPIC_PREFIX) + std::string(TEST_TOPIC)),
             discovery_engine_(TEST_SINK_CLIENT_ID),
-            mainLoop_(nullptr) {};
+            mainLoop_(nullptr){};
     void SetUp() override
     {
         mainLoop_ = g_main_loop_new(nullptr, FALSE);
@@ -482,20 +548,31 @@ class SinkStreamManagerTest : public testing::Test {
         DBG("Go forward");
     }
 
-    static void OnDiscovered(const std::string &clientId, const std::string &status,
+    static void OnDiscoveredSink(const std::string &clientId, const std::string &status,
           const void *msg, const int szmsg, SinkStreamManagerTest *test)
     {
-        DBG("OnDiscovered");
-        if (g_main_loop_is_running(test->mainLoop_))
+        DBG("OnDiscoveredSink");
+        if (WebRtcMessage::IsValidStreamInfo(std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
+                  static_cast<const uint8_t *>(msg) + szmsg))
+              && g_main_loop_is_running(test->mainLoop_))
             g_main_loop_quit(test->mainLoop_);
     }
 
-    static void OnStreamReady(WebRtcStream &stream, SinkStreamManagerTest *test)
+    static void OnIceCandidateAdded(WebRtcStream &stream, SinkStreamManager *sink_mgr,
+          SinkStreamManagerTest *test)
+    {
+        DBG("OnIceCandidateAdded");
+        auto discovery_message = sink_mgr->GetDiscoveryMessage();
+        test->discovery_engine_.UpdateDiscoveryMsg(sink_mgr->GetTopic(), discovery_message.data(),
+              discovery_message.size());
+    }
+
+    static void OnStreamReady(WebRtcStream &stream, SinkStreamManager *sink_mgr,
+          SinkStreamManagerTest *test)
     {
         DBG("OnStreamReady");
-        auto discovery_message = WebRtcMessage::GenerateDiscoveryMessage(test->test_topic_, false,
-              stream.GetLocalDescription(), stream.GetIceCandidates());
-        test->discovery_engine_.UpdateDiscoveryMsg(test->test_topic_, discovery_message.data(),
+        auto discovery_message = sink_mgr->GetDiscoveryMessage();
+        test->discovery_engine_.UpdateDiscoveryMsg(sink_mgr->GetTopic(), discovery_message.data(),
               discovery_message.size());
     }
 
@@ -512,12 +589,18 @@ TEST_F(SinkStreamManagerTest, test_SinkStreamManager_Start_OnDevice)
     std::unique_ptr<StreamManager> sink_manager(
           new SinkStreamManager(test_topic_, TEST_SINK_CLIENT_ID, s_stream.str()));
 
-    auto on_discovered = std::bind(OnDiscovered, std::placeholders::_1, std::placeholders::_2,
+    auto on_discovered = std::bind(OnDiscoveredSink, std::placeholders::_1, std::placeholders::_2,
           std::placeholders::_3, std::placeholders::_4, this);
 
-    discovery_cb_ = discovery_engine_.AddDiscoveryCB(test_topic_, on_discovered);
+    // Sink should subscribe WEBRTC_SRC_TOPIC but this is for test
+    discovery_cb_ = discovery_engine_.AddDiscoveryCB(sink_manager->GetTopic(), on_discovered);
+
+    auto on_ice_candidate_added = std::bind(OnIceCandidateAdded, std::placeholders::_1,
+          static_cast<SinkStreamManager *>(sink_manager.get()), this);
+    sink_manager->SetIceCandidateAddedCallback(on_ice_candidate_added);
 
-    auto on_stream_ready = std::bind(OnStreamReady, std::placeholders::_1, this);
+    auto on_stream_ready = std::bind(OnStreamReady, std::placeholders::_1,
+          static_cast<SinkStreamManager *>(sink_manager.get()), this);
     sink_manager->SetStreamReadyCallback(on_stream_ready);
 
     sink_manager->Start();
@@ -558,19 +641,38 @@ class SinkSrcStreamManagerTest : public testing::Test {
               new aitt::MosquittoMQ(std::string(TEST_SINK_CLIENT_ID) + 'd', false)));
         sink_discovery_engine_.Start(LOCAL_IP, DEFAULT_BROKER_PORT, std::string(), std::string());
 
-        auto discovered_at_sink = std::bind(DiscoveredAtSink, std::placeholders::_1,
-              std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, this);
-        sink_discovery_cb_ = sink_discovery_engine_.AddDiscoveryCB(test_topic_, discovered_at_sink);
-        sink_discovery_engine_.Restart();
-
         src_discovery_engine_.SetMQ(std::unique_ptr<aitt::MQ>(
               new aitt::MosquittoMQ(std::string(TEST_SRC_CLIENT_ID) + 'd', false)));
         src_discovery_engine_.Start(LOCAL_IP, DEFAULT_BROKER_PORT, std::string(), std::string());
+    }
 
-        auto discovered_at_src = std::bind(DiscoveredAtSrc, std::placeholders::_1,
-              std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, this);
-        src_discovery_cb_ = src_discovery_engine_.AddDiscoveryCB(test_topic_, discovered_at_src);
-        src_discovery_engine_.Restart();
+    static void DiscoveredAtSink(const std::string &clientId, const std::string &status,
+          const void *msg, const int szmsg, SinkSrcStreamManagerTest *test)
+    {
+        DBG("DiscoveredAtSink");
+
+        auto sink_manager = static_cast<SinkStreamManager *>(test->sink_manager_.get());
+        if (!status.compare(aitt::AittDiscovery::WILL_LEAVE_NETWORK)) {
+            sink_manager->HandleRemovedClient(clientId);
+            return;
+        }
+
+        sink_manager->HandleMsg(clientId, std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
+                                                static_cast<const uint8_t *>(msg) + szmsg));
+    }
+
+    static void DiscoveredAtSrc(const std::string &clientId, const std::string &status,
+          const void *msg, const int szmsg, SinkSrcStreamManagerTest *test)
+    {
+        DBG("DiscoveredAtSrc");
+
+        auto src_manager = static_cast<SrcStreamManager *>(test->src_manager_.get());
+        if (!status.compare(aitt::AittDiscovery::WILL_LEAVE_NETWORK)) {
+            src_manager->HandleRemovedClient(clientId);
+            return;
+        }
+        src_manager->HandleMsg(clientId, std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
+                                               static_cast<const uint8_t *>(msg) + szmsg));
     }
 
     void TearDown() override
@@ -589,9 +691,6 @@ class SinkSrcStreamManagerTest : public testing::Test {
             src_discovery_engine_.SetMQ(nullptr);
         }
 
-        sink_manager_->Stop();
-        src_manager_->Stop();
-
         g_main_loop_unref(mainLoop_);
     }
 
@@ -618,6 +717,18 @@ class SinkSrcStreamManagerTest : public testing::Test {
 
     void StartSinkStream()
     {
+        auto discovered_at_sink = std::bind(DiscoveredAtSink, std::placeholders::_1,
+              std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, this);
+        sink_discovery_cb_ = sink_discovery_engine_.AddDiscoveryCB(
+              sink_manager_->GetWatchingTopic(), discovered_at_sink);
+        sink_discovery_engine_.Restart();
+
+        auto on_stream_stopped = std::bind(OnSinkStreamStopped, this);
+        sink_manager_->SetStreamStopCallback(on_stream_stopped);
+
+        auto on_ice_candidate = std::bind(OnSinkIceCandidate, std::placeholders::_1, this);
+        sink_manager_->SetIceCandidateAddedCallback(on_ice_candidate);
+
         auto on_sink_stream_ready = std::bind(OnSinkStreamReady, std::placeholders::_1, this);
         sink_manager_->SetStreamReadyCallback(on_sink_stream_ready);
 
@@ -628,89 +739,115 @@ class SinkSrcStreamManagerTest : public testing::Test {
         sink_manager_->Start();
     }
 
+    static void OnSinkStreamStopped(SinkSrcStreamManagerTest *test)
+    {
+        std::vector<uint8_t> msg;
+
+        flexbuffers::Builder fbb;
+        fbb.String("STOP");
+        fbb.Finish();
+
+        msg = fbb.GetBuffer();
+        test->sink_discovery_engine_.UpdateDiscoveryMsg(test->sink_manager_->GetTopic(), msg.data(),
+              msg.size());
+    }
+
+    static void OnSinkIceCandidate(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+    {
+        DBG("OnSinkIceCandidate");
+        auto discovery_message = test->sink_manager_->GetDiscoveryMessage();
+        test->sink_discovery_engine_.UpdateDiscoveryMsg(test->sink_manager_->GetTopic(),
+              discovery_message.data(), discovery_message.size());
+    }
+
+    static void OnSinkStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+    {
+        DBG("OnSinkStreamReady");
+
+        auto discovery_message = test->sink_manager_->GetDiscoveryMessage();
+        test->sink_discovery_engine_.UpdateDiscoveryMsg(test->sink_manager_->GetTopic(),
+              discovery_message.data(), discovery_message.size());
+    }
+
+    static void OnEncodedFrame(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+    {
+        static_cast<SinkStreamManager *>(test->sink_manager_.get())->SetOnEncodedFrameCallback(nullptr);
+
+        if (test->stop_sink_first_)
+            test->AddIdleStopSinkStream();
+        else
+            test->AddIdleStopSrcStream();
+    }
+
     void StartSrcStream()
     {
+        auto discovered_at_src = std::bind(DiscoveredAtSrc, std::placeholders::_1,
+              std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, this);
+        src_discovery_cb_ = src_discovery_engine_.AddDiscoveryCB(src_manager_->GetWatchingTopic(),
+              discovered_at_src);
+        src_discovery_engine_.Restart();
+
+        auto on_stream_started = std::bind(OnStreamStarted, this);
+        src_manager_->SetStreamStartCallback(on_stream_started);
+        auto on_stream_stopped = std::bind(OnSrcStreamStopped, this);
+        src_manager_->SetStreamStopCallback(on_stream_stopped);
+        auto on_ice_candidate = std::bind(OnSrcIceCandidate, std::placeholders::_1, this);
+        src_manager_->SetIceCandidateAddedCallback(on_ice_candidate);
+
         auto on_src_stream_ready = std::bind(OnSrcStreamReady, std::placeholders::_1, this);
         src_manager_->SetStreamReadyCallback(on_src_stream_ready);
 
         src_manager_->Start();
     }
 
-    static void DiscoveredAtSink(const std::string &clientId, const std::string &status,
-          const void *msg, const int szmsg, SinkSrcStreamManagerTest *test)
+    static void OnStreamStarted(SinkSrcStreamManagerTest *test)
     {
-        // TODO: Rearrange below
-        DBG("DiscoveredAtSink");
-        if (!clientId.compare(test->sink_manager_->GetClientId())) {
-            DBG("but has same ID");
-            return;
-        }
+        DBG("Send Start");
+        std::vector<uint8_t> msg;
 
-        auto sink_manager = static_cast<SinkStreamManager *>(test->sink_manager_.get());
-        if (!status.compare(aitt::AittDiscovery::WILL_LEAVE_NETWORK)) {
-            sink_manager->HandleRemovedClient(clientId);
-            return;
-        }
+        flexbuffers::Builder fbb;
+        fbb.String("START");
+        fbb.Finish();
 
-        sink_manager->HandleDiscoveredStream(clientId,
-              std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
-                    static_cast<const uint8_t *>(msg) + szmsg));
+        msg = fbb.GetBuffer();
+        test->src_discovery_engine_.UpdateDiscoveryMsg(test->src_manager_->GetTopic(), msg.data(),
+              msg.size());
     }
 
-    static void DiscoveredAtSrc(const std::string &clientId, const std::string &status,
-          const void *msg, const int szmsg, SinkSrcStreamManagerTest *test)
+    static void OnSrcStreamStopped(SinkSrcStreamManagerTest *test)
     {
-        // TODO: Rearrange below
-        DBG("DiscoveredAtSrc");
-        if (!clientId.compare(test->src_manager_->GetClientId())) {
-            DBG("but has same ID");
-            return;
-        }
-
-        auto src_manager = static_cast<SrcStreamManager *>(test->src_manager_.get());
+        std::vector<uint8_t> msg;
 
-        if (!status.compare(aitt::AittDiscovery::WILL_LEAVE_NETWORK)) {
-            src_manager->HandleRemovedClient(clientId);
-            return;
-        }
+        flexbuffers::Builder fbb;
+        fbb.String("STOP");
+        fbb.Finish();
 
-        src_manager->HandleDiscoveredStream(clientId,
-              std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
-                    static_cast<const uint8_t *>(msg) + szmsg));
+        msg = fbb.GetBuffer();
+        test->src_discovery_engine_.UpdateDiscoveryMsg(test->src_manager_->GetTopic(), msg.data(),
+              msg.size());
     }
 
-    static void OnSinkStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+    static void OnSrcIceCandidate(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
     {
-        DBG("OnSinkStreamReady");
-
-        auto discovery_message = WebRtcMessage::GenerateDiscoveryMessage(test->test_topic_, false,
-              stream.GetLocalDescription(), stream.GetIceCandidates());
-        test->sink_discovery_engine_.UpdateDiscoveryMsg(test->test_topic_, discovery_message.data(),
-              discovery_message.size());
+        DBG("OnIceCandidateAdded");
+        auto discovery_message = test->src_manager_->GetDiscoveryMessage();
+        test->src_discovery_engine_.UpdateDiscoveryMsg(test->src_manager_->GetTopic(),
+              discovery_message.data(), discovery_message.size());
     }
 
     static void OnSrcStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
     {
         DBG("OnSrcStreamReady");
 
-        auto discovery_message = WebRtcMessage::GenerateDiscoveryMessage(test->test_topic_, true,
-              stream.GetLocalDescription(), stream.GetIceCandidates());
-        test->src_discovery_engine_.UpdateDiscoveryMsg(test->test_topic_, discovery_message.data(),
-              discovery_message.size());
-    }
-
-    static void OnEncodedFrame(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
-    {
-        if (test->stop_sink_first_)
-            test->AddIdleStopSinkStream();
-        else
-            test->AddIdleStopSrcStream();
+        auto discovery_message = test->src_manager_->GetDiscoveryMessage();
+        test->src_discovery_engine_.UpdateDiscoveryMsg(test->src_manager_->GetTopic(),
+              discovery_message.data(), discovery_message.size());
     }
 
     void StartSinkFirst(void)
     {
         start_sink_id_ = g_timeout_add_seconds(1, GSourceStartSink, this);
-        start_src_id_ = g_timeout_add_seconds(4, GSourceStartSrc, this);
+        start_src_id_ = g_timeout_add_seconds(2, GSourceStartSrc, this);
     }
 
     void StartSrcFirst(void)
@@ -753,11 +890,6 @@ class SinkSrcStreamManagerTest : public testing::Test {
             return FALSE;
 
         static_cast<SinkStreamManager *>(test->sink_manager_.get())->Stop();
-        if (test->sink_discovery_engine_.HasValidMQ()) {
-            test->sink_discovery_engine_.RemoveDiscoveryCB(test->sink_discovery_cb_);
-            test->sink_discovery_engine_.Stop();
-            test->sink_discovery_engine_.SetMQ(nullptr);
-        }
 
         return FALSE;
     }
@@ -775,11 +907,6 @@ class SinkSrcStreamManagerTest : public testing::Test {
             return FALSE;
 
         static_cast<SrcStreamManager *>(test->src_manager_.get())->Stop();
-        if (test->src_discovery_engine_.HasValidMQ()) {
-            test->src_discovery_engine_.RemoveDiscoveryCB(test->src_discovery_cb_);
-            test->src_discovery_engine_.Stop();
-            test->src_discovery_engine_.SetMQ(nullptr);
-        }
 
         return FALSE;
     }
@@ -837,87 +964,3 @@ TEST_F(SinkSrcStreamManagerTest, test_Src_Src_OnDevice)
     AddStopEventLoop();
     IterateEventLoop();
 }
-
-class ModuleTest : public testing::Test {
-  protected:
-    ModuleTest()
-          : test_topic_(std::string(WEBRTC_TOPIC_PREFIX) + std::string(TEST_TOPIC)),
-            sink_discovery_engine_(std::string(TEST_SINK_CLIENT_ID)),
-            src_discovery_engine_(std::string(TEST_SRC_CLIENT_ID)),
-            sink_module_(nullptr),
-            src_module_(nullptr),
-            mainLoop_(nullptr){};
-
-    void SetUp() override
-    {
-        // create g_main_loop & connect broker
-        mainLoop_ = g_main_loop_new(nullptr, FALSE);
-
-        sink_discovery_engine_.SetMQ(std::unique_ptr<aitt::MQ>(
-              new aitt::MosquittoMQ(std::string(TEST_SINK_CLIENT_ID) + 'd', false)));
-        sink_discovery_engine_.Start(LOCAL_IP, DEFAULT_BROKER_PORT, std::string(), std::string());
-        sink_discovery_engine_.Restart();
-        sink_module_ = std::unique_ptr<Module>(new Module(sink_discovery_engine_, test_topic_,
-              AittStreamRole::AITT_STREAM_ROLE_SUBSCRIBER));
-
-        src_discovery_engine_.SetMQ(std::unique_ptr<aitt::MQ>(
-              new aitt::MosquittoMQ(std::string(TEST_SRC_CLIENT_ID) + 'd', false)));
-        src_discovery_engine_.Start(LOCAL_IP, DEFAULT_BROKER_PORT, std::string(), std::string());
-        src_discovery_engine_.Restart();
-        src_module_ = std::unique_ptr<Module>(new Module(src_discovery_engine_, test_topic_,
-              AittStreamRole::AITT_STREAM_ROLE_PUBLISHER));
-    }
-
-    void TearDown() override
-    {
-        // disconnect broker & destroy g_main_loop
-
-        if (sink_discovery_engine_.HasValidMQ()) {
-            sink_discovery_engine_.Stop();
-            sink_discovery_engine_.SetMQ(nullptr);
-        }
-
-        if (src_discovery_engine_.HasValidMQ()) {
-            src_discovery_engine_.Stop();
-            src_discovery_engine_.SetMQ(nullptr);
-        }
-
-        g_main_loop_unref(mainLoop_);
-    }
-
-    void IterateEventLoop(void)
-    {
-        DBG("Go forward");
-        g_main_loop_run(mainLoop_);
-    }
-
-    void AddStopEventLoop(void) { g_timeout_add_seconds(15, GSourceStopEventLoop, this); }
-
-    static gboolean GSourceStopEventLoop(gpointer data)
-    {
-        DBG("GSourceStopEventLoop");
-        auto test = static_cast<ModuleTest *>(data);
-        if (!test)
-            return FALSE;
-
-        if (g_main_loop_is_running(test->mainLoop_))
-            g_main_loop_quit(test->mainLoop_);
-
-        return FALSE;
-    }
-
-    std::string test_topic_;
-    aitt::AittDiscovery sink_discovery_engine_;
-    aitt::AittDiscovery src_discovery_engine_;
-    std::unique_ptr<Module> sink_module_;
-    std::unique_ptr<Module> src_module_;
-    GMainLoop *mainLoop_;
-};
-
-TEST_F(ModuleTest, test_OnDevice)
-{
-    sink_module_->Start();
-    src_module_->Start();
-    AddStopEventLoop();
-    IterateEventLoop();
-}