From 611fe530e98a4f6143ecc48027b4bc8fc8ca59cf Mon Sep 17 00:00:00 2001 From: Jiung Yu Date: Thu, 10 Nov 2022 10:56:48 +0900 Subject: [PATCH] StreamManager refactoring --- modules/webrtc/CMakeLists.txt | 1 + modules/webrtc/Module.cc | 40 ++------ modules/webrtc/Module.h | 13 ++- modules/webrtc/SinkStreamManager.cc | 134 +++++------------------- modules/webrtc/SinkStreamManager.h | 30 ++---- modules/webrtc/SrcStreamManager.cc | 125 ++++------------------- modules/webrtc/SrcStreamManager.h | 30 ++---- modules/webrtc/StreamManager.cc | 96 +++++++++++++++++ modules/webrtc/StreamManager.h | 42 ++++---- modules/webrtc/WebRtcEventHandler.h | 3 +- modules/webrtc/WebRtcStream.cc | 25 +++++ modules/webrtc/WebRtcStream.h | 26 +---- modules/webrtc/tests/WEBRTC_test.cc | 198 +++++++++++------------------------- 13 files changed, 287 insertions(+), 476 deletions(-) create mode 100644 modules/webrtc/StreamManager.cc diff --git a/modules/webrtc/CMakeLists.txt b/modules/webrtc/CMakeLists.txt index 1f3014a..647a6a0 100644 --- a/modules/webrtc/CMakeLists.txt +++ b/modules/webrtc/CMakeLists.txt @@ -11,6 +11,7 @@ INCLUDE_DIRECTORIES(${AITT_WEBRTC_NEEDS_INCLUDE_DIRS}) LINK_DIRECTORIES(${AITT_WEBRTC_NEEDS_LIBRARY_DIRS}) ADD_LIBRARY(WEBRTC_OBJ OBJECT + StreamManager.cc SrcStreamManager.cc SinkStreamManager.cc WebRtcMessage.cc diff --git a/modules/webrtc/Module.cc b/modules/webrtc/Module.cc index 55f94b1..294c3d5 100644 --- a/modules/webrtc/Module.cc +++ b/modules/webrtc/Module.cc @@ -29,7 +29,10 @@ namespace AittWebRTCNamespace { Module::Module(AittDiscovery &discovery, const std::string &topic, AittStreamRole role) - : is_source_(IsSource(role)), discovery_(discovery), state_cb_user_data_(nullptr), receive_cb_user_data_(nullptr) + : is_source_(role == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER), + discovery_(discovery), + state_cb_user_data_(nullptr), + receive_cb_user_data_(nullptr) { std::stringstream s_stream; s_stream << std::this_thread::get_id(); @@ -59,21 +62,11 @@ void Module::SetConfig(const std::string &key, void *obj) { } - void Module::Start(void) { - auto on_ice_candidate_added = - std::bind(&Module::OnIceCandidateAdded, this, std::placeholders::_1); - stream_manager_->SetIceCandidateAddedCallback(on_ice_candidate_added); - - auto on_stream_ready = std::bind(&Module::OnStreamReady, this, std::placeholders::_1); - stream_manager_->SetStreamReadyCallback(on_stream_ready); - - auto on_stream_started = std::bind(&Module::OnStreamStarted, this); - stream_manager_->SetStreamStartCallback(on_stream_started); - - auto on_stream_stopped = std::bind(&Module::OnStreamStopped, this); - stream_manager_->SetStreamStopCallback(on_stream_stopped); + stream_manager_->SetIceCandidateAddedCallback(std::bind(&Module::OnIceCandidateAdded, this)); + stream_manager_->SetStreamStartCallback(std::bind(&Module::OnStreamStarted, this)); + stream_manager_->SetStreamStopCallback(std::bind(&Module::OnStreamStopped, this)); stream_manager_->Start(); } @@ -82,21 +75,13 @@ void Module::Stop(void) { } -void Module::OnIceCandidateAdded(WebRtcStream &stream) +void Module::OnIceCandidateAdded(void) { DBG("OnIceCandidateAdded"); auto msg = stream_manager_->GetDiscoveryMessage(); discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size()); } -void Module::OnStreamReady(WebRtcStream &stream) -{ - DBG("OnStreamReady"); - - auto msg = stream_manager_->GetDiscoveryMessage(); - discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size()); -} - void Module::OnStreamStarted(void) { std::vector msg; @@ -132,10 +117,6 @@ void Module::SetReceiveCallback(ReceiveCallback cb, void *user_data) receive_callback_ = cb; receive_cb_user_data_ = user_data; } -bool Module::IsSource(AittStreamRole role) -{ - return role == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER; -} void Module::DiscoveryMessageCallback(const std::string &clientId, const std::string &status, const void *msg, const int szmsg) @@ -145,9 +126,8 @@ void Module::DiscoveryMessageCallback(const std::string &clientId, const std::st return; } - stream_manager_->HandleMsg(clientId, - std::vector(static_cast(msg), - static_cast(msg) + szmsg)); + stream_manager_->HandleMsg(clientId, std::vector(static_cast(msg), + static_cast(msg) + szmsg)); } } // namespace AittWebRTCNamespace diff --git a/modules/webrtc/Module.h b/modules/webrtc/Module.h index 11575d6..f43f328 100644 --- a/modules/webrtc/Module.h +++ b/modules/webrtc/Module.h @@ -18,13 +18,14 @@ #include #include -#include "StreamManager.h" #include #include #include #include +#include "StreamManager.h" + using AittDiscovery = aitt::AittDiscovery; using AittStreamModule = aitt::AittStreamModule; @@ -42,12 +43,10 @@ class Module : public AittStreamModule { void Stop(void) override; void SetStateCallback(StateCallback cb, void *user_data) override; void SetReceiveCallback(ReceiveCallback cb, void *user_data) override; - static bool IsSource(AittStreamRole role); private: - //TODO: Update Ice Candidates when stream add candidates. - void OnIceCandidateAdded(WebRtcStream &stream); - void OnStreamReady(WebRtcStream &stream); + // TODO: Update Ice Candidates when stream add candidates. + void OnIceCandidateAdded(void); void OnStreamStarted(void); void OnStreamStopped(void); void DiscoveryMessageCallback(const std::string &clientId, const std::string &status, @@ -57,8 +56,8 @@ class Module : public AittStreamModule { AittDiscovery &discovery_; int discovery_cb_; - //TODO: What if user copies the module? - //Think about that case with destructor + // TODO: What if user copies the module? + // Think about that case with destructor StreamManager *stream_manager_; StateCallback state_callback_; void *state_cb_user_data_; diff --git a/modules/webrtc/SinkStreamManager.cc b/modules/webrtc/SinkStreamManager.cc index 52b52e1..f7a9258 100644 --- a/modules/webrtc/SinkStreamManager.cc +++ b/modules/webrtc/SinkStreamManager.cc @@ -24,60 +24,15 @@ namespace AittWebRTCNamespace { SinkStreamManager::SinkStreamManager(const std::string &topic, const std::string &aitt_id, const std::string &thread_id) - : StreamManager(topic + "/SINK", aitt_id, thread_id), watching_topic_(topic + "/SRC") + : StreamManager(topic + "/SINK", topic + "/SRC", aitt_id, thread_id) { } SinkStreamManager::~SinkStreamManager() { - for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr) + for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) itr->second->Destroy(); - src_stream_.clear(); -} - -void SinkStreamManager::Start() -{ - DBG("%s %s", __func__, GetTopic().c_str()); - if (stream_start_cb_) - stream_start_cb_(); -} - -void SinkStreamManager::Stop() -{ - DBG("%s %s", __func__, GetTopic().c_str()); - // TODO: You should take care about stream resource - for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr) - itr->second->Destroy(); - src_stream_.clear(); - - if (stream_stop_cb_) - stream_stop_cb_(); -} - -void SinkStreamManager::OnEncodedFrame(WebRtcStream &stream) -{ - if (encoded_frame_cb_) - encoded_frame_cb_(stream); -} - -void SinkStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) -{ - ice_candidate_added_cb_ = cb; -} - -void SinkStreamManager::SetStreamReadyCallback(StreamReadyCallback cb) -{ - stream_ready_cb_ = cb; -} - -void SinkStreamManager::SetStreamStartCallback(StreamStartCallback cb) -{ - stream_start_cb_ = cb; -} - -void SinkStreamManager::SetStreamStopCallback(StreamStopCallback cb) -{ - stream_stop_cb_ = cb; + streams_.clear(); } void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb) @@ -87,31 +42,21 @@ void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb) void SinkStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream) { - auto on_stream_state_changed_cb = std::bind(&SinkStreamManager::OnStreamStateChanged, this, - std::placeholders::_1, std::ref(stream)); - stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb); - - auto on_ice_candidate_added_cb = std::bind(&SinkStreamManager::OnIceCandidate, this, - std::placeholders::_1, std::ref(stream)); - stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb); + stream.GetEventHandler().SetOnStateChangedCb(std::bind(&SinkStreamManager::OnStreamStateChanged, + this, std::placeholders::_1, std::ref(stream))); - auto on_ice_gathering_state_changed_cb = - std::bind(&SinkStreamManager::OnIceGatheringStateNotify, this, std::placeholders::_1, - std::ref(stream)); - stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb); + stream.GetEventHandler().SetOnIceCandidateCb( + std::bind(&SinkStreamManager::OnIceCandidate, this)); - auto on_encoded_frame_cb = - std::bind(&SinkStreamManager::OnEncodedFrame, this, std::ref(stream)); - stream.GetEventHandler().SetOnEncodedFrameCb(on_encoded_frame_cb); + stream.GetEventHandler().SetOnEncodedFrameCb(std::bind(&SinkStreamManager::OnEncodedFrame, this)); } void SinkStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream) { DBG("OnSinkStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str()); if (state == WebRtcState::Stream::NEGOTIATING) { - auto on_offer_created = std::bind(&SinkStreamManager::OnOfferCreated, this, - std::placeholders::_1, std::ref(stream)); - stream.CreateOfferAsync(on_offer_created); + stream.CreateOfferAsync(std::bind(&SinkStreamManager::OnOfferCreated, this, + std::placeholders::_1, std::ref(stream))); } } @@ -122,50 +67,22 @@ void SinkStreamManager::OnOfferCreated(std::string sdp, WebRtcStream &stream) stream.SetLocalDescription(sdp); } -void SinkStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream) +void SinkStreamManager::OnIceCandidate(void) { if (ice_candidate_added_cb_) - ice_candidate_added_cb_(stream); -} - -void SinkStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state, - WebRtcStream &stream) -{ - DBG("Sink IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str()); - if (state == WebRtcState::IceGathering::COMPLETE) { - if (stream_ready_cb_) - stream_ready_cb_(stream); - } -} - -void SinkStreamManager::HandleRemovedClient(const std::string &discovery_id) -{ - auto src_stream_itr = src_stream_.find(discovery_id); - if (src_stream_itr == src_stream_.end()) { - DBG("There's no source stream %s", discovery_id.c_str()); - return; - } - - // TODO: You should take care about stream resource - src_stream_itr->second->Destroy(); - src_stream_.erase(src_stream_itr); - - return; + ice_candidate_added_cb_(); } -void SinkStreamManager::HandleMsg(const std::string &discovery_id, - const std::vector &message) +void SinkStreamManager::OnEncodedFrame(void) { - if (flexbuffers::GetRoot(message).IsString()) - HandleStreamState(discovery_id, message); - - else if (flexbuffers::GetRoot(message).IsVector()) - HandleStreamInfo(discovery_id, message); + if (encoded_frame_cb_) + encoded_frame_cb_(); } void SinkStreamManager::HandleStreamState(const std::string &discovery_id, const std::vector &message) { + DBG("%s", __func__); auto src_state = flexbuffers::GetRoot(message).ToString(); if (src_state.compare("START") == 0) @@ -178,8 +95,9 @@ void SinkStreamManager::HandleStreamState(const std::string &discovery_id, void SinkStreamManager::HandleStartStream(const std::string &discovery_id) { - auto src_stream = src_stream_.find(discovery_id); - if (src_stream != src_stream_.end()) { + DBG("%s", __func__); + auto src_stream = streams_.find(discovery_id); + if (src_stream != streams_.end()) { DBG("There's stream already"); return; } @@ -199,7 +117,7 @@ void SinkStreamManager::AddStream(const std::string &discovery_id) s_stream << static_cast(stream); stream->SetStreamId(std::string(thread_id_ + s_stream.str())); - src_stream_[discovery_id] = stream; + streams_[discovery_id] = stream; } void SinkStreamManager::HandleStreamInfo(const std::string &discovery_id, @@ -229,8 +147,8 @@ void SinkStreamManager::UpdateStreamInfo(const std::string &discovery_id, const const std::string &peer_id, const std::string &sdp, const std::vector &ice_candidates) { - auto src_stream = src_stream_.find(discovery_id); - if (src_stream == src_stream_.end()) { + auto src_stream = streams_.find(discovery_id); + if (src_stream == streams_.end()) { DBG("No matching stream"); return; } @@ -246,7 +164,6 @@ void SinkStreamManager::UpdateStreamInfo(const std::string &discovery_id, const src_stream->second->AddPeerInformation(sdp, ice_candidates); } else src_stream->second->UpdatePeerInformation(ice_candidates); - } std::vector SinkStreamManager::GetDiscoveryMessage(void) @@ -255,7 +172,7 @@ std::vector SinkStreamManager::GetDiscoveryMessage(void) flexbuffers::Builder fbb; fbb.Vector([&] { - for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr) { + for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) { fbb.Map([&] { fbb.String("id", itr->second->GetStreamId()); fbb.String("peer_id", itr->second->GetPeerId()); @@ -274,9 +191,4 @@ std::vector SinkStreamManager::GetDiscoveryMessage(void) return message; } -std::string SinkStreamManager::GetWatchingTopic(void) -{ - return watching_topic_; -} - } // namespace AittWebRTCNamespace diff --git a/modules/webrtc/SinkStreamManager.h b/modules/webrtc/SinkStreamManager.h index 1b27625..cf433bc 100644 --- a/modules/webrtc/SinkStreamManager.h +++ b/modules/webrtc/SinkStreamManager.h @@ -24,47 +24,31 @@ namespace AittWebRTCNamespace { class SinkStreamManager : public StreamManager { public: - using EncodedFrameCallabck = std::function; + using EncodedFrameCallabck = std::function; explicit SinkStreamManager(const std::string &topic, const std::string &aitt_id, const std::string &thread_id); virtual ~SinkStreamManager(); - void Start(void) override; - void Stop(void) override; - void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) override; - void SetStreamReadyCallback(StreamReadyCallback cb) override; - void SetStreamStartCallback(StreamStartCallback cb) override; - void SetStreamStopCallback(StreamStopCallback cb) override; std::vector GetDiscoveryMessage(void) override; - std::string GetWatchingTopic(void) override; // TODO: WebRTC CAPI doesn't allow destroy webrtc handle at callback. // We need to avoid that situation void SetOnEncodedFrameCallback(EncodedFrameCallabck cb); - void HandleRemovedClient(const std::string &discovery_id) override; - void HandleMsg(const std::string &discovery_id, const std::vector &message) override; private: void SetWebRtcStreamCallbacks(WebRtcStream &stream) override; void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream); void OnOfferCreated(std::string sdp, WebRtcStream &stream); - void OnIceCandidate(const std::string &candidate, WebRtcStream &stream); - void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream); - void OnEncodedFrame(WebRtcStream &stream); - void HandleStreamState(const std::string &discovery_id, const std::vector &message); + void OnIceCandidate(void); + void OnEncodedFrame(void); + void HandleStreamState(const std::string &discovery_id, + const std::vector &message) override; void HandleStartStream(const std::string &discovery_id); - void HandleStreamInfo(const std::string &discovery_id, const std::vector &message); + void HandleStreamInfo(const std::string &discovery_id, + const std::vector &message) override; void AddStream(const std::string &discovery_id); void UpdateStreamInfo(const std::string &discovery_id, const std::string &id, const std::string &peer_id, const std::string &sdp, const std::vector &ice_candidates); - std::string watching_topic_; - // TODO: What if user copies the module? - // Think about that case with destructor - std::map src_stream_; - IceCandidateAddedCallback ice_candidate_added_cb_; - StreamReadyCallback stream_ready_cb_; - StreamStartCallback stream_start_cb_; - StreamStopCallback stream_stop_cb_; EncodedFrameCallabck encoded_frame_cb_; }; } // namespace AittWebRTCNamespace diff --git a/modules/webrtc/SrcStreamManager.cc b/modules/webrtc/SrcStreamManager.cc index be11c96..574d9c7 100644 --- a/modules/webrtc/SrcStreamManager.cc +++ b/modules/webrtc/SrcStreamManager.cc @@ -26,77 +26,32 @@ namespace AittWebRTCNamespace { SrcStreamManager::SrcStreamManager(const std::string &topic, const std::string &aitt_id, const std::string &thread_id) - : StreamManager(topic + "/SRC", aitt_id, thread_id), watching_topic_(topic + "/SINK") + : StreamManager(topic + "/SRC", topic + "/SINK", aitt_id, thread_id) { } SrcStreamManager::~SrcStreamManager() { // TODO: You should take care about stream resource - for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr) + for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) itr->second->Destroy(); - sink_streams_.clear(); -} - -void SrcStreamManager::Start(void) -{ - DBG("%s %s", __func__, GetTopic().c_str()); - if (stream_start_cb_) - stream_start_cb_(); -} - -void SrcStreamManager::Stop(void) -{ - DBG("%s %s", __func__, GetTopic().c_str()); - // TODO: You should take care about stream resource - for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr) - itr->second->Destroy(); - sink_streams_.clear(); - - if (stream_stop_cb_) - stream_stop_cb_(); -} - -void SrcStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) -{ - ice_candidate_added_cb_ = cb; -} - -void SrcStreamManager::SetStreamReadyCallback(StreamReadyCallback cb) -{ - stream_ready_cb_ = cb; -} - -void SrcStreamManager::SetStreamStartCallback(StreamStartCallback cb) -{ - stream_start_cb_ = cb; -} - -void SrcStreamManager::SetStreamStopCallback(StreamStopCallback cb) -{ - stream_stop_cb_ = cb; + streams_.clear(); } void SrcStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream) { - auto on_stream_state_changed_cb = std::bind(&SrcStreamManager::OnStreamStateChanged, this, - std::placeholders::_1, std::ref(stream)); - stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb); - - auto on_ice_candidate_added_cb = std::bind(&SrcStreamManager::OnIceCandidate, this, - std::placeholders::_1, std::ref(stream)); - stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb); + stream.GetEventHandler().SetOnStateChangedCb( + std::bind(&SrcStreamManager::OnStreamStateChanged, this, std::placeholders::_1)); - auto on_signaling_state_notify_cb = std::bind(&SrcStreamManager::OnSignalingStateNotify, this, - std::placeholders::_1, std::ref(stream)); - stream.GetEventHandler().SetOnSignalingStateNotifyCb(on_signaling_state_notify_cb); + stream.GetEventHandler().SetOnSignalingStateNotifyCb( + std::bind(&SrcStreamManager::OnSignalingStateNotify, this, std::placeholders::_1, + std::ref(stream))); - auto on_ice_gathering_state_changed_cb = std::bind(&SrcStreamManager::OnIceGatheringStateNotify, - this, std::placeholders::_1, std::ref(stream)); - stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb); + stream.GetEventHandler().SetOnIceCandidateCb( + std::bind(&SrcStreamManager::OnIceCandidate, this)); } -void SrcStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream) +void SrcStreamManager::OnStreamStateChanged(WebRtcState::Stream state) { DBG("OnSrcStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str()); } @@ -105,9 +60,8 @@ void SrcStreamManager::OnSignalingStateNotify(WebRtcState::Signaling state, WebR { DBG("OnSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str()); if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) { - auto on_answer_created = std::bind(&SrcStreamManager::OnAnswerCreated, this, - std::placeholders::_1, std::ref(stream)); - stream.CreateAnswerAsync(on_answer_created); + stream.CreateAnswerAsync(std::bind(&SrcStreamManager::OnAnswerCreated, this, + std::placeholders::_1, std::ref(stream))); } } @@ -118,49 +72,16 @@ void SrcStreamManager::OnAnswerCreated(std::string sdp, WebRtcStream &stream) stream.SetLocalDescription(sdp); } -void SrcStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream) +void SrcStreamManager::OnIceCandidate(void) { if (ice_candidate_added_cb_) - ice_candidate_added_cb_(stream); -} - -void SrcStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state, - WebRtcStream &stream) -{ - DBG("Src IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str()); - if (state == WebRtcState::IceGathering::COMPLETE) { - if (stream_ready_cb_) - stream_ready_cb_(stream); - } -} - -void SrcStreamManager::HandleRemovedClient(const std::string &discovery_id) -{ - auto sink_stream_itr = sink_streams_.find(discovery_id); - if (sink_stream_itr == sink_streams_.end()) { - DBG("There's no sink stream %s", discovery_id.c_str()); - return; - } - - // TODO: You should take care about stream resource - sink_stream_itr->second->Destroy(); - sink_streams_.erase(sink_stream_itr); - - return; -} - -void SrcStreamManager::HandleMsg(const std::string &discovery_id, - const std::vector &message) -{ - if (flexbuffers::GetRoot(message).IsString()) - HandleStreamState(discovery_id, message); - else if (flexbuffers::GetRoot(message).IsVector()) - HandleStreamInfo(discovery_id, message); + ice_candidate_added_cb_(); } void SrcStreamManager::HandleStreamState(const std::string &discovery_id, const std::vector &message) { + DBG("%s", __func__); auto sink_state = flexbuffers::GetRoot(message).ToString(); if (sink_state.compare("STOP") == 0) @@ -172,6 +93,7 @@ void SrcStreamManager::HandleStreamState(const std::string &discovery_id, void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id, const std::vector &message) { + DBG("%s", __func__); if (!WebRtcMessage::IsValidStreamInfo(message)) { DBG("Invalid streams info"); return; @@ -196,8 +118,8 @@ void SrcStreamManager::UpdateStreamInfo(const std::string &discovery_id, const s const std::string &peer_id, const std::string &sdp, const std::vector &ice_candidates) { - auto sink_stream = sink_streams_.find(discovery_id); - if (sink_stream == sink_streams_.end()) + auto sink_stream = streams_.find(discovery_id); + if (sink_stream == streams_.end()) AddStream(discovery_id, id, sdp, ice_candidates); else sink_stream->second->UpdatePeerInformation(ice_candidates); @@ -219,7 +141,7 @@ void SrcStreamManager::AddStream(const std::string &discovery_id, const std::str stream->SetPeerId(id); stream->AddPeerInformation(sdp, ice_candidates); - sink_streams_[discovery_id] = stream; + streams_[discovery_id] = stream; return; } @@ -230,7 +152,7 @@ std::vector SrcStreamManager::GetDiscoveryMessage(void) flexbuffers::Builder fbb; fbb.Vector([&] { - for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr) { + for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) { fbb.Map([&] { fbb.String("id", itr->second->GetStreamId()); fbb.String("peer_id", itr->second->GetPeerId()); @@ -249,9 +171,4 @@ std::vector SrcStreamManager::GetDiscoveryMessage(void) return message; } -std::string SrcStreamManager::GetWatchingTopic(void) -{ - return watching_topic_; -} - } // namespace AittWebRTCNamespace diff --git a/modules/webrtc/SrcStreamManager.h b/modules/webrtc/SrcStreamManager.h index aca572a..e87d063 100644 --- a/modules/webrtc/SrcStreamManager.h +++ b/modules/webrtc/SrcStreamManager.h @@ -27,40 +27,22 @@ class SrcStreamManager : public StreamManager { explicit SrcStreamManager(const std::string &topic, const std::string &aitt_id, const std::string &thread_id); virtual ~SrcStreamManager(); - void Start(void) override; - // TODO: What's the best way to shutdown all? - void Stop(void) override; - void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) override; - void SetStreamReadyCallback(StreamReadyCallback cb) override; - void SetStreamStartCallback(StreamStartCallback cb) override; - void SetStreamStopCallback(StreamStopCallback cb) override; std::vector GetDiscoveryMessage(void) override; - std::string GetWatchingTopic(void) override; - void HandleRemovedClient(const std::string &discovery_id) override; - void HandleMsg(const std::string &discovery_id, const std::vector &message) override; private: void SetWebRtcStreamCallbacks(WebRtcStream &stream) override; - void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream); + void OnStreamStateChanged(WebRtcState::Stream state); void OnAnswerCreated(std::string sdp, WebRtcStream &stream); - void OnIceCandidate(const std::string &candidate, WebRtcStream &stream); + void OnIceCandidate(void); void OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream); - void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream); - void HandleStreamState(const std::string &discovery_id, const std::vector &message); - void HandleStreamInfo(const std::string &discovery_id, const std::vector &message); + void HandleStreamState(const std::string &discovery_id, + const std::vector &message) override; + void HandleStreamInfo(const std::string &discovery_id, + const std::vector &message) override; void AddStream(const std::string &discovery_id, const std::string &id, const std::string &sdp, const std::vector &ice_candidates); void UpdateStreamInfo(const std::string &discovery_id, const std::string &id, const std::string &peer_id, const std::string &sdp, const std::vector &ice_candidates); - - std::string watching_topic_; - // TODO: What if user copies the module? - // Think about that case with destructor - std::map sink_streams_; - IceCandidateAddedCallback ice_candidate_added_cb_; - StreamReadyCallback stream_ready_cb_; - StreamStartCallback stream_start_cb_; - StreamStopCallback stream_stop_cb_; }; } // namespace AittWebRTCNamespace diff --git a/modules/webrtc/StreamManager.cc b/modules/webrtc/StreamManager.cc new file mode 100644 index 0000000..3e81a9d --- /dev/null +++ b/modules/webrtc/StreamManager.cc @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "StreamManager.h" + +#include "aitt_internal.h" + +namespace AittWebRTCNamespace { + +StreamManager::StreamManager(const std::string &topic, const std::string &watching_topic, + const std::string &aitt_id, const std::string &thread_id) + : topic_(topic), watching_topic_(watching_topic), aitt_id_(aitt_id), thread_id_(thread_id) +{ +} + +void StreamManager::Start(void) +{ + DBG("%s %s", __func__, GetTopic().c_str()); + if (stream_start_cb_) + stream_start_cb_(); +} + +void StreamManager::Stop(void) +{ + DBG("%s %s", __func__, GetTopic().c_str()); + // TODO: You should take care about stream resource + for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) + itr->second->Destroy(); + streams_.clear(); + + if (stream_stop_cb_) + stream_stop_cb_(); +} + +void StreamManager::HandleRemovedClient(const std::string &discovery_id) +{ + auto stream_itr = streams_.find(discovery_id); + if (stream_itr == streams_.end()) { + DBG("There's no stream %s", discovery_id.c_str()); + return; + } + + // TODO: You should take care about stream resource + stream_itr->second->Destroy(); + streams_.erase(stream_itr); + + return; +} + +void StreamManager::HandleMsg(const std::string &discovery_id, const std::vector &message) +{ + if (flexbuffers::GetRoot(message).IsString()) + HandleStreamState(discovery_id, message); + else if (flexbuffers::GetRoot(message).IsVector()) + HandleStreamInfo(discovery_id, message); +} + +std::string StreamManager::GetTopic(void) const +{ + return topic_; +} + +std::string StreamManager::GetWatchingTopic(void) const +{ + return watching_topic_; +} + +void StreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) +{ + ice_candidate_added_cb_ = cb; +} + +void StreamManager::SetStreamStartCallback(StreamStartCallback cb) +{ + stream_start_cb_ = cb; +} + +void StreamManager::SetStreamStopCallback(StreamStopCallback cb) +{ + stream_stop_cb_ = cb; +} + +} // namespace AittWebRTCNamespace diff --git a/modules/webrtc/StreamManager.h b/modules/webrtc/StreamManager.h index a613a4a..98677b7 100644 --- a/modules/webrtc/StreamManager.h +++ b/modules/webrtc/StreamManager.h @@ -27,36 +27,42 @@ namespace AittWebRTCNamespace { class StreamManager { public: - using IceCandidateAddedCallback = std::function; - using StreamReadyCallback = std::function; + using IceCandidateAddedCallback = std::function; using StreamStartCallback = std::function; using StreamStopCallback = std::function; - explicit StreamManager(const std::string &topic, const std::string &aitt_id, - const std::string &thread_id) - : topic_(topic), aitt_id_(aitt_id), thread_id_(thread_id){}; + explicit StreamManager(const std::string &topic, const std::string &watching_topic, + const std::string &aitt_id, const std::string &thread_id); virtual ~StreamManager() = default; - std::string GetTopic(void) const { return topic_; }; - std::string GetClientId(void) const { return aitt_id_; }; - virtual void HandleRemovedClient(const std::string &discovery_id) = 0; - virtual void HandleMsg(const std::string &discovery_id, - const std::vector &message) = 0; - - virtual void Start(void) = 0; - virtual void Stop(void) = 0; - virtual void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) = 0; - virtual void SetStreamReadyCallback(StreamReadyCallback cb) = 0; - virtual void SetStreamStartCallback(StreamStartCallback cb) = 0; - virtual void SetStreamStopCallback(StreamStopCallback cb) = 0; virtual std::vector GetDiscoveryMessage(void) = 0; - virtual std::string GetWatchingTopic(void) = 0; + + void Start(void); + void Stop(void); + void HandleRemovedClient(const std::string &discovery_id); + void HandleMsg(const std::string &discovery_id, const std::vector &message); + void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb); + void SetStreamStartCallback(StreamStartCallback cb); + void SetStreamStopCallback(StreamStopCallback cb); + std::string GetTopic(void) const; + std::string GetWatchingTopic(void) const; protected: std::string topic_; + std::string watching_topic_; // TODO: why dont' we remove below std::string aitt_id_; std::string thread_id_; + // TODO: What if user copies the module? + // Think about that case with destructor + std::map streams_; + StreamStartCallback stream_start_cb_; + StreamStopCallback stream_stop_cb_; + IceCandidateAddedCallback ice_candidate_added_cb_; private: virtual void SetWebRtcStreamCallbacks(WebRtcStream &stream) = 0; + virtual void HandleStreamState(const std::string &discovery_id, + const std::vector &message) = 0; + virtual void HandleStreamInfo(const std::string &discovery_id, + const std::vector &message) = 0; }; } // namespace AittWebRTCNamespace diff --git a/modules/webrtc/WebRtcEventHandler.h b/modules/webrtc/WebRtcEventHandler.h index 308171d..0cf9c99 100644 --- a/modules/webrtc/WebRtcEventHandler.h +++ b/modules/webrtc/WebRtcEventHandler.h @@ -48,8 +48,7 @@ class WebRtcEventHandler { }; void UnsetOnSignalingStateNotifyCb(void) { on_signaling_state_notify_cb_ = nullptr; }; - void SetOnIceCandidateCb( - std::function on_ice_candidate_cb) + void SetOnIceCandidateCb(std::function on_ice_candidate_cb) { on_ice_candidate_cb_ = on_ice_candidate_cb; }; diff --git a/modules/webrtc/WebRtcStream.cc b/modules/webrtc/WebRtcStream.cc index 0e1662a..035393b 100644 --- a/modules/webrtc/WebRtcStream.cc +++ b/modules/webrtc/WebRtcStream.cc @@ -229,6 +229,26 @@ bool WebRtcStream::SetRemoteDescription(const std::string &description) return ret == WEBRTC_ERROR_NONE; } +void WebRtcStream::SetStreamId(const std::string &id) +{ + id_ = id; +} + +std::string WebRtcStream::GetStreamId(void) const +{ + return id_; +} + +void WebRtcStream::SetPeerId(const std::string &id) +{ + peer_id_ = id; +} + +std::string &WebRtcStream::GetPeerId(void) +{ + return peer_id_; +} + bool WebRtcStream::AddIceCandidateFromMessage(const std::string &ice_message) { ERR("%s", __func__); @@ -412,6 +432,11 @@ void WebRtcStream::DetachSignals(void) webrtc_data_channel_unset_open_cb(channel_); } +WebRtcEventHandler &WebRtcStream::GetEventHandler(void) +{ + return event_handler_; +} + void WebRtcStream::OnError(webrtc_h webrtc, webrtc_error_e error, webrtc_state_e state, void *user_data) { diff --git a/modules/webrtc/WebRtcStream.h b/modules/webrtc/WebRtcStream.h index 9b4c703..9e2c74e 100644 --- a/modules/webrtc/WebRtcStream.h +++ b/modules/webrtc/WebRtcStream.h @@ -44,33 +44,17 @@ class WebRtcStream { void DetachSignals(void); // Cautions : Event handler is not a pointer. So, change event_handle after Set Event handler // doesn't affect event handler which is included in WebRtcStream - void SetEventHandler(WebRtcEventHandler event_handler) { event_handler_ = event_handler; }; - WebRtcEventHandler &GetEventHandler(void) { return event_handler_; }; + WebRtcEventHandler &GetEventHandler(void); bool CreateOfferAsync(std::function on_created_cb); - void CallOnOfferCreatedCb(std::string offer) - { - if (on_offer_created_cb_) - on_offer_created_cb_(offer); - } bool CreateAnswerAsync(std::function on_created_cb); - void CallOnAnswerCreatedCb(std::string answer) - { - if (on_answer_created_cb_) - on_answer_created_cb_(answer); - } - void SetPreparedLocalDescription(const std::string &description) - { - local_description_ = description; - }; - std::string GetPreparedLocalDescription(void) const { return local_description_; }; bool SetLocalDescription(const std::string &description); bool SetRemoteDescription(const std::string &description); - void SetStreamId(const std::string &id) { id_ = id; }; - std::string GetStreamId(void) const { return id_; }; - void SetPeerId(const std::string &id) { peer_id_ = id; }; - std::string &GetPeerId(void) { return peer_id_; }; + void SetStreamId(const std::string &id); + std::string GetStreamId(void) const; + void SetPeerId(const std::string &id); + std::string &GetPeerId(void); bool AddIceCandidateFromMessage(const std::string &ice_message); bool AddPeerInformation(const std::string &sdp, const std::vector &ice_candidates); diff --git a/modules/webrtc/tests/WEBRTC_test.cc b/modules/webrtc/tests/WEBRTC_test.cc index f983d11..f027203 100644 --- a/modules/webrtc/tests/WEBRTC_test.cc +++ b/modules/webrtc/tests/WEBRTC_test.cc @@ -80,9 +80,8 @@ class WebRtcStreamTest : public testing::Test { { DBG("OnStreamStateChanged"); if (state == WebRtcState::Stream::NEGOTIATING) { - auto on_offer_created = - std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test); - stream.CreateOfferAsync(on_offer_created); + stream.CreateOfferAsync( + std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test)); } } static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcStreamTest *test) @@ -114,13 +113,11 @@ TEST_F(WebRtcStreamTest, test_Start_WebRtcSrcStream_OnDevice) WebRtcStream stream{}; EXPECT_EQ(true, stream.Create(true, false)) << "Failed to create source stream"; EXPECT_EQ(true, stream.AttachCameraSource()) << "Failed to attach camera source"; - auto on_stream_state_changed_cb = - std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream), this); - stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb); + stream.GetEventHandler().SetOnStateChangedCb( + std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream), this)); - auto on_ice_gathering_state_changed_cb = - std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this); - stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb); + stream.GetEventHandler().SetOnIceGatheringStateNotifyCb( + std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this)); stream.Start(); IterateEventLoop(); EXPECT_EQ(WebRtcMessage::Type::SDP, WebRtcMessage::getMessageType(local_description_)); @@ -144,11 +141,9 @@ class WebRtcSourceOffererTest : public testing::Test { WebRtcSourceOffererTest *test) { DBG("OnSrcStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str()); - if (state == WebRtcState::Stream::NEGOTIATING) { - auto on_offer_created = - std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test); - stream.CreateOfferAsync(on_offer_created); - } + if (state == WebRtcState::Stream::NEGOTIATING) + stream.CreateOfferAsync( + std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test)); } static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcSourceOffererTest *test) @@ -194,11 +189,9 @@ class WebRtcSourceOffererTest : public testing::Test { WebRtcSourceOffererTest *test) { DBG("OnSinkSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str()); - if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) { - auto on_answer_created = - std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test); - stream.CreateAnswerAsync(on_answer_created); - } + if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) + stream.CreateAnswerAsync( + std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test)); } static void OnSinkIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream, @@ -229,36 +222,27 @@ TEST_F(WebRtcSourceOffererTest, test_Start_WebRtcStream_OnDevice) { EXPECT_EQ(true, src_stream_.Create(true, false)) << "Failed to create source stream"; EXPECT_EQ(true, src_stream_.AttachCameraSource()) << "Failed to attach camera source"; - auto on_src_stream_state_changed_cb = - std::bind(OnSrcStreamStateChanged, std::placeholders::_1, std::ref(src_stream_), this); - src_stream_.GetEventHandler().SetOnStateChangedCb(on_src_stream_state_changed_cb); + src_stream_.GetEventHandler().SetOnStateChangedCb( + std::bind(OnSrcStreamStateChanged, std::placeholders::_1, std::ref(src_stream_), this)); - auto on_src_signaling_state_changed_cb = - std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this); - src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_src_signaling_state_changed_cb); + src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb( + std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this)); - auto on_src_ice_gathering_state_changed_cb = std::bind(OnSrcIceGatheringStateNotify, - std::placeholders::_1, std::ref(src_stream_), this); - src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb( - on_src_ice_gathering_state_changed_cb); + src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind( + OnSrcIceGatheringStateNotify, std::placeholders::_1, std::ref(src_stream_), this)); src_stream_.Start(); EXPECT_EQ(true, sink_stream_.Create(false, false)) << "Failed to create sink stream"; - auto on_sink_stream_state_changed_cb = - std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this); - sink_stream_.GetEventHandler().SetOnStateChangedCb(on_sink_stream_state_changed_cb); + sink_stream_.GetEventHandler().SetOnStateChangedCb( + std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this)); - auto on_sink_signaling_state_changed_cb = std::bind(OnSinkSignalingStateNotify, - std::placeholders::_1, std::ref(sink_stream_), this); - sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_sink_signaling_state_changed_cb); + sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(std::bind(OnSinkSignalingStateNotify, + std::placeholders::_1, std::ref(sink_stream_), this)); - auto on_sink_ice_gathering_state_changed_cb = std::bind(OnSinkIceGatheringStateNotify, - std::placeholders::_1, std::ref(sink_stream_), this); - sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb( - on_sink_ice_gathering_state_changed_cb); + sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind( + OnSinkIceGatheringStateNotify, std::placeholders::_1, std::ref(sink_stream_), this)); - auto on_sink_encoded_frame_cb = std::bind(OnSinkStreamEncodedFrame, this); - sink_stream_.GetEventHandler().SetOnEncodedFrameCb(on_sink_encoded_frame_cb); + sink_stream_.GetEventHandler().SetOnEncodedFrameCb(std::bind(OnSinkStreamEncodedFrame, this)); sink_stream_.Start(); IterateEventLoop(); } @@ -277,11 +261,9 @@ class WebRtcSinkOffererTest : public testing::Test { WebRtcSinkOffererTest *test) { DBG("OnSinkStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str()); - if (state == WebRtcState::Stream::NEGOTIATING) { - auto on_offer_created = - std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test); - stream.CreateOfferAsync(on_offer_created); - } + if (state == WebRtcState::Stream::NEGOTIATING) + stream.CreateOfferAsync( + std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test)); } static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcSinkOffererTest *test) @@ -326,11 +308,9 @@ class WebRtcSinkOffererTest : public testing::Test { WebRtcSinkOffererTest *test) { DBG("OnSrcSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str()); - if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) { - auto on_answer_created = - std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test); - stream.CreateAnswerAsync(on_answer_created); - } + if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) + stream.CreateAnswerAsync( + std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test)); } static void OnSrcIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream, @@ -365,32 +345,24 @@ TEST_F(WebRtcSinkOffererTest, test_Start_WebRtcStream_OnDevice) std::bind(OnSrcStreamStateChanged, std::placeholders::_1, std::ref(src_stream_), this); src_stream_.GetEventHandler().SetOnStateChangedCb(on_src_stream_state_changed_cb); - auto on_src_signaling_state_changed_cb = - std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this); - src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_src_signaling_state_changed_cb); + src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb( + std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this)); - auto on_src_ice_gathering_state_changed_cb = std::bind(OnSrcIceGatheringStateNotify, - std::placeholders::_1, std::ref(src_stream_), this); - src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb( - on_src_ice_gathering_state_changed_cb); + src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind( + OnSrcIceGatheringStateNotify, std::placeholders::_1, std::ref(src_stream_), this)); src_stream_.Start(); EXPECT_EQ(true, sink_stream_.Create(false, false)) << "Failed to create sink stream"; - auto on_sink_stream_state_changed_cb = - std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this); - sink_stream_.GetEventHandler().SetOnStateChangedCb(on_sink_stream_state_changed_cb); + sink_stream_.GetEventHandler().SetOnStateChangedCb( + std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this)); - auto on_sink_signaling_state_changed_cb = std::bind(OnSinkSignalingStateNotify, - std::placeholders::_1, std::ref(sink_stream_), this); - sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_sink_signaling_state_changed_cb); + sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(std::bind(OnSinkSignalingStateNotify, + std::placeholders::_1, std::ref(sink_stream_), this)); - auto on_sink_ice_gathering_state_changed_cb = std::bind(OnSinkIceGatheringStateNotify, - std::placeholders::_1, std::ref(sink_stream_), this); - sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb( - on_sink_ice_gathering_state_changed_cb); + sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind( + OnSinkIceGatheringStateNotify, std::placeholders::_1, std::ref(sink_stream_), this)); - auto on_sink_encoded_frame_cb = std::bind(OnSinkStreamEncodedFrame, this); - sink_stream_.GetEventHandler().SetOnEncodedFrameCb(on_sink_encoded_frame_cb); + sink_stream_.GetEventHandler().SetOnEncodedFrameCb(std::bind(OnSinkStreamEncodedFrame, this)); sink_stream_.Start(); IterateEventLoop(); } @@ -509,13 +481,10 @@ TEST_F(SrcStreamManagerTest, test_SrcStreamManager_Start_OnDevice) // Src should subscribe WEBRTC_SINK_TOPIC but this is for test discovery_cb_ = discovery_engine_.AddDiscoveryCB(src_manager->GetTopic(), on_discovered); - auto on_stream_started = - std::bind(OnStreamStarted, this, static_cast(src_manager.get())); - src_manager->SetStreamStartCallback(on_stream_started); - - auto on_stream_stopped = - std::bind(OnStreamStopped, this, static_cast(src_manager.get())); - src_manager->SetStreamStopCallback(on_stream_stopped); + src_manager->SetStreamStartCallback( + std::bind(OnStreamStarted, this, static_cast(src_manager.get()))); + src_manager->SetStreamStopCallback( + std::bind(OnStreamStopped, this, static_cast(src_manager.get()))); src_manager->Start(); IterateEventLoop(); @@ -558,8 +527,7 @@ class SinkStreamManagerTest : public testing::Test { g_main_loop_quit(test->mainLoop_); } - static void OnIceCandidateAdded(WebRtcStream &stream, SinkStreamManager *sink_mgr, - SinkStreamManagerTest *test) + static void OnIceCandidateAdded(SinkStreamManager *sink_mgr, SinkStreamManagerTest *test) { DBG("OnIceCandidateAdded"); auto discovery_message = sink_mgr->GetDiscoveryMessage(); @@ -567,15 +535,6 @@ class SinkStreamManagerTest : public testing::Test { discovery_message.size()); } - static void OnStreamReady(WebRtcStream &stream, SinkStreamManager *sink_mgr, - SinkStreamManagerTest *test) - { - DBG("OnStreamReady"); - auto discovery_message = sink_mgr->GetDiscoveryMessage(); - test->discovery_engine_.UpdateDiscoveryMsg(sink_mgr->GetTopic(), discovery_message.data(), - discovery_message.size()); - } - int discovery_cb_; std::string test_topic_; aitt::AittDiscovery discovery_engine_; @@ -595,13 +554,8 @@ TEST_F(SinkStreamManagerTest, test_SinkStreamManager_Start_OnDevice) // Sink should subscribe WEBRTC_SRC_TOPIC but this is for test discovery_cb_ = discovery_engine_.AddDiscoveryCB(sink_manager->GetTopic(), on_discovered); - auto on_ice_candidate_added = std::bind(OnIceCandidateAdded, std::placeholders::_1, - static_cast(sink_manager.get()), this); - sink_manager->SetIceCandidateAddedCallback(on_ice_candidate_added); - - auto on_stream_ready = std::bind(OnStreamReady, std::placeholders::_1, - static_cast(sink_manager.get()), this); - sink_manager->SetStreamReadyCallback(on_stream_ready); + sink_manager->SetIceCandidateAddedCallback(std::bind(OnIceCandidateAdded, + static_cast(sink_manager.get()), this)); sink_manager->Start(); IterateEventLoop(); @@ -723,18 +677,12 @@ class SinkSrcStreamManagerTest : public testing::Test { sink_manager_->GetWatchingTopic(), discovered_at_sink); sink_discovery_engine_.Restart(); - auto on_stream_stopped = std::bind(OnSinkStreamStopped, this); - sink_manager_->SetStreamStopCallback(on_stream_stopped); + sink_manager_->SetStreamStopCallback(std::bind(OnSinkStreamStopped, this)); + sink_manager_->SetIceCandidateAddedCallback( + std::bind(OnSinkIceCandidate, this)); - auto on_ice_candidate = std::bind(OnSinkIceCandidate, std::placeholders::_1, this); - sink_manager_->SetIceCandidateAddedCallback(on_ice_candidate); - - auto on_sink_stream_ready = std::bind(OnSinkStreamReady, std::placeholders::_1, this); - sink_manager_->SetStreamReadyCallback(on_sink_stream_ready); - - auto on_encoded_frame = std::bind(OnEncodedFrame, std::placeholders::_1, this); static_cast(sink_manager_.get()) - ->SetOnEncodedFrameCallback(on_encoded_frame); + ->SetOnEncodedFrameCallback(std::bind(OnEncodedFrame, this)); sink_manager_->Start(); } @@ -752,7 +700,7 @@ class SinkSrcStreamManagerTest : public testing::Test { msg.size()); } - static void OnSinkIceCandidate(WebRtcStream &stream, SinkSrcStreamManagerTest *test) + static void OnSinkIceCandidate(SinkSrcStreamManagerTest *test) { DBG("OnSinkIceCandidate"); auto discovery_message = test->sink_manager_->GetDiscoveryMessage(); @@ -760,18 +708,10 @@ class SinkSrcStreamManagerTest : public testing::Test { discovery_message.data(), discovery_message.size()); } - static void OnSinkStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test) - { - DBG("OnSinkStreamReady"); - - auto discovery_message = test->sink_manager_->GetDiscoveryMessage(); - test->sink_discovery_engine_.UpdateDiscoveryMsg(test->sink_manager_->GetTopic(), - discovery_message.data(), discovery_message.size()); - } - - static void OnEncodedFrame(WebRtcStream &stream, SinkSrcStreamManagerTest *test) + static void OnEncodedFrame(SinkSrcStreamManagerTest *test) { - static_cast(test->sink_manager_.get())->SetOnEncodedFrameCallback(nullptr); + static_cast(test->sink_manager_.get()) + ->SetOnEncodedFrameCallback(nullptr); if (test->stop_sink_first_) test->AddIdleStopSinkStream(); @@ -787,15 +727,10 @@ class SinkSrcStreamManagerTest : public testing::Test { discovered_at_src); src_discovery_engine_.Restart(); - auto on_stream_started = std::bind(OnStreamStarted, this); - src_manager_->SetStreamStartCallback(on_stream_started); - auto on_stream_stopped = std::bind(OnSrcStreamStopped, this); - src_manager_->SetStreamStopCallback(on_stream_stopped); - auto on_ice_candidate = std::bind(OnSrcIceCandidate, std::placeholders::_1, this); - src_manager_->SetIceCandidateAddedCallback(on_ice_candidate); - - auto on_src_stream_ready = std::bind(OnSrcStreamReady, std::placeholders::_1, this); - src_manager_->SetStreamReadyCallback(on_src_stream_ready); + src_manager_->SetStreamStartCallback(std::bind(OnStreamStarted, this)); + src_manager_->SetStreamStopCallback(std::bind(OnSrcStreamStopped, this)); + src_manager_->SetIceCandidateAddedCallback( + std::bind(OnSrcIceCandidate, this)); src_manager_->Start(); } @@ -827,7 +762,7 @@ class SinkSrcStreamManagerTest : public testing::Test { msg.size()); } - static void OnSrcIceCandidate(WebRtcStream &stream, SinkSrcStreamManagerTest *test) + static void OnSrcIceCandidate(SinkSrcStreamManagerTest *test) { DBG("OnIceCandidateAdded"); auto discovery_message = test->src_manager_->GetDiscoveryMessage(); @@ -835,15 +770,6 @@ class SinkSrcStreamManagerTest : public testing::Test { discovery_message.data(), discovery_message.size()); } - static void OnSrcStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test) - { - DBG("OnSrcStreamReady"); - - auto discovery_message = test->src_manager_->GetDiscoveryMessage(); - test->src_discovery_engine_.UpdateDiscoveryMsg(test->src_manager_->GetTopic(), - discovery_message.data(), discovery_message.size()); - } - void StartSinkFirst(void) { start_sink_id_ = g_timeout_add_seconds(1, GSourceStartSink, this); -- 2.7.4