void AittDiscovery::Restart()
{
- RET_IF(callback_handle);
+ RET_IF(callback_handle == nullptr);
discovery_mq->Unsubscribe(callback_handle);
callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
namespace AittWebRTCNamespace {
Module::Module(AittDiscovery &discovery, const std::string &topic, AittStreamRole role)
- : is_source_(IsSource(role)), topic_(topic), discovery_(discovery)
+ : is_source_(IsSource(role)), discovery_(discovery), state_cb_user_data_(nullptr), receive_cb_user_data_(nullptr)
{
- discovery_cb_ = discovery_.AddDiscoveryCB(topic,
- std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
-
std::stringstream s_stream;
s_stream << std::this_thread::get_id();
- if (is_source_) {
- stream_manager_ = new SrcStreamManager(topic_, discovery_.GetId(), s_stream.str());
- } else {
- stream_manager_ = new SinkStreamManager(topic_, discovery_.GetId(), s_stream.str());
- }
+ if (is_source_)
+ stream_manager_ = new SrcStreamManager(topic, discovery_.GetId(), s_stream.str());
+ else
+ stream_manager_ = new SinkStreamManager(topic, discovery_.GetId(), s_stream.str());
+
+ discovery_cb_ = discovery_.AddDiscoveryCB(stream_manager_->GetWatchingTopic(),
+ std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
}
Module::~Module(void)
{
}
+
void Module::Start(void)
{
+ auto on_ice_candidate_added =
+ std::bind(&Module::OnIceCandidateAdded, this, std::placeholders::_1);
+ stream_manager_->SetIceCandidateAddedCallback(on_ice_candidate_added);
+
auto on_stream_ready = std::bind(&Module::OnStreamReady, this, std::placeholders::_1);
stream_manager_->SetStreamReadyCallback(on_stream_ready);
+
+ auto on_stream_started = std::bind(&Module::OnStreamStarted, this);
+ stream_manager_->SetStreamStartCallback(on_stream_started);
+
+ auto on_stream_stopped = std::bind(&Module::OnStreamStopped, this);
+ stream_manager_->SetStreamStopCallback(on_stream_stopped);
+
stream_manager_->Start();
}
{
}
+void Module::OnIceCandidateAdded(WebRtcStream &stream)
+{
+ DBG("OnIceCandidateAdded");
+ auto msg = stream_manager_->GetDiscoveryMessage();
+ discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
+}
+
void Module::OnStreamReady(WebRtcStream &stream)
{
DBG("OnStreamReady");
- auto discovery_message = WebRtcMessage::GenerateDiscoveryMessage(topic_, is_source_,
- stream.GetLocalDescription(), stream.GetIceCandidates());
- discovery_.UpdateDiscoveryMsg(topic_, discovery_message.data(), discovery_message.size());
+ auto msg = stream_manager_->GetDiscoveryMessage();
+ discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
+}
+
+void Module::OnStreamStarted(void)
+{
+ std::vector<uint8_t> msg;
+
+ flexbuffers::Builder fbb;
+ fbb.String("START");
+ fbb.Finish();
+
+ msg = fbb.GetBuffer();
+ discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
+}
+
+void Module::OnStreamStopped(void)
+{
+ std::vector<uint8_t> msg;
+
+ flexbuffers::Builder fbb;
+ fbb.String("STOP");
+ fbb.Finish();
+
+ msg = fbb.GetBuffer();
+ discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
}
void Module::SetStateCallback(StateCallback cb, void *user_data)
void Module::DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
const void *msg, const int szmsg)
{
- if (!clientId.compare(discovery_.GetId()))
- return;
-
if (!status.compare(AittDiscovery::WILL_LEAVE_NETWORK)) {
stream_manager_->HandleRemovedClient(clientId);
return;
}
- stream_manager_->HandleDiscoveredStream(clientId,
+ stream_manager_->HandleMsg(clientId,
std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
static_cast<const uint8_t *>(msg) + szmsg));
}
static bool IsSource(AittStreamRole role);
private:
+ //TODO: Update Ice Candidates when stream add candidates.
+ void OnIceCandidateAdded(WebRtcStream &stream);
void OnStreamReady(WebRtcStream &stream);
+ void OnStreamStarted(void);
+ void OnStreamStopped(void);
void DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
const void *msg, const int szmsg);
bool is_source_;
- std::string topic_;
AittDiscovery &discovery_;
int discovery_cb_;
*/
#include "SinkStreamManager.h"
+#include <flatbuffers/flexbuffers.h>
+
#include <sstream>
#include "aitt_internal.h"
namespace AittWebRTCNamespace {
SinkStreamManager::SinkStreamManager(const std::string &topic, const std::string &aitt_id,
const std::string &thread_id)
- : StreamManager(topic, aitt_id, thread_id)
+ : StreamManager(topic + "/SINK", aitt_id, thread_id), watching_topic_(topic + "/SRC")
{
- std::stringstream s_stream;
- s_stream << static_cast<void *>(&stream_);
- stream_.SetStreamId(aitt_id + thread_id + s_stream.str());
}
+
SinkStreamManager::~SinkStreamManager()
{
- Stop();
+ for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr)
+ itr->second->Destroy();
+ src_stream_.clear();
+}
+
+void SinkStreamManager::Start()
+{
+ DBG("%s %s", __func__, GetTopic().c_str());
+ if (stream_start_cb_)
+ stream_start_cb_();
+}
+
+void SinkStreamManager::Stop()
+{
+ DBG("%s %s", __func__, GetTopic().c_str());
+ // TODO: You should take care about stream resource
+ for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr)
+ itr->second->Destroy();
+ src_stream_.clear();
+
+ if (stream_stop_cb_)
+ stream_stop_cb_();
+}
+
+void SinkStreamManager::OnEncodedFrame(WebRtcStream &stream)
+{
+ if (encoded_frame_cb_)
+ encoded_frame_cb_(stream);
+}
+
+void SinkStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb)
+{
+ ice_candidate_added_cb_ = cb;
+}
+
+void SinkStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
+{
+ stream_ready_cb_ = cb;
+}
+
+void SinkStreamManager::SetStreamStartCallback(StreamStartCallback cb)
+{
+ stream_start_cb_ = cb;
+}
+
+void SinkStreamManager::SetStreamStopCallback(StreamStopCallback cb)
+{
+ stream_stop_cb_ = cb;
+}
+
+void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb)
+{
+ encoded_frame_cb_ = cb;
}
void SinkStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
{
- auto on_stream_state_changed_cb =
- std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream));
+ auto on_stream_state_changed_cb = std::bind(&SinkStreamManager::OnStreamStateChanged, this,
+ std::placeholders::_1, std::ref(stream));
stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
+ auto on_ice_candidate_added_cb = std::bind(&SinkStreamManager::OnIceCandidate, this,
+ std::placeholders::_1, std::ref(stream));
+ stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb);
+
auto on_ice_gathering_state_changed_cb =
- std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this);
+ std::bind(&SinkStreamManager::OnIceGatheringStateNotify, this, std::placeholders::_1,
+ std::ref(stream));
stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
- auto on_encoded_frame_cb = std::bind(OnEncodedFrame, std::ref(stream), this);
+ auto on_encoded_frame_cb =
+ std::bind(&SinkStreamManager::OnEncodedFrame, this, std::ref(stream));
stream.GetEventHandler().SetOnEncodedFrameCb(on_encoded_frame_cb);
}
{
DBG("OnSinkStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
if (state == WebRtcState::Stream::NEGOTIATING) {
- auto on_offer_created = std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream));
+ auto on_offer_created = std::bind(&SinkStreamManager::OnOfferCreated, this,
+ std::placeholders::_1, std::ref(stream));
stream.CreateOfferAsync(on_offer_created);
}
}
stream.SetLocalDescription(sdp);
}
+void SinkStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream)
+{
+ if (ice_candidate_added_cb_)
+ ice_candidate_added_cb_(stream);
+}
+
void SinkStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state,
- WebRtcStream &stream, SinkStreamManager *manager)
+ WebRtcStream &stream)
{
DBG("Sink IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
if (state == WebRtcState::IceGathering::COMPLETE) {
- if (manager && manager->stream_ready_cb_)
- manager->stream_ready_cb_(stream);
+ if (stream_ready_cb_)
+ stream_ready_cb_(stream);
}
}
-void SinkStreamManager::OnEncodedFrame(WebRtcStream &stream, SinkStreamManager *manager)
+void SinkStreamManager::HandleRemovedClient(const std::string &discovery_id)
{
- if (manager && manager->encoded_frame_cb_)
- manager->encoded_frame_cb_(stream);
+ auto src_stream_itr = src_stream_.find(discovery_id);
+ if (src_stream_itr == src_stream_.end()) {
+ DBG("There's no source stream %s", discovery_id.c_str());
+ return;
+ }
+
+ // TODO: You should take care about stream resource
+ src_stream_itr->second->Destroy();
+ src_stream_.erase(src_stream_itr);
+
+ return;
}
-void SinkStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
+void SinkStreamManager::HandleMsg(const std::string &discovery_id,
+ const std::vector<uint8_t> &message)
{
- stream_ready_cb_ = cb;
+ if (flexbuffers::GetRoot(message).IsString())
+ HandleStreamState(discovery_id, message);
+
+ else if (flexbuffers::GetRoot(message).IsVector())
+ HandleStreamInfo(discovery_id, message);
}
-void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb)
+void SinkStreamManager::HandleStreamState(const std::string &discovery_id,
+ const std::vector<uint8_t> &message)
{
- encoded_frame_cb_ = cb;
+ auto src_state = flexbuffers::GetRoot(message).ToString();
+
+ if (src_state.compare("START") == 0)
+ HandleStartStream(discovery_id);
+ else if (src_state.compare("STOP") == 0)
+ HandleRemovedClient(discovery_id);
+ else
+ DBG("Invalid message %s", src_state);
}
-void SinkStreamManager::Start()
+void SinkStreamManager::HandleStartStream(const std::string &discovery_id)
{
- // TODO: Handle failures on create and start
- SetWebRtcStreamCallbacks(stream_);
- stream_.Create(false, false);
- stream_.Start();
+ auto src_stream = src_stream_.find(discovery_id);
+ if (src_stream != src_stream_.end()) {
+ DBG("There's stream already");
+ return;
+ }
+
+ DBG("Src Stream Started");
+ AddStream(discovery_id);
}
-void SinkStreamManager::Stop()
+void SinkStreamManager::AddStream(const std::string &discovery_id)
{
- stream_.Destroy();
+ auto stream = new WebRtcStream();
+ SetWebRtcStreamCallbacks(*stream);
+ stream->Create(false, false);
+ stream->Start();
+
+ std::stringstream s_stream;
+ s_stream << static_cast<void *>(stream);
+
+ stream->SetStreamId(std::string(thread_id_ + s_stream.str()));
+ src_stream_[discovery_id] = stream;
}
-void SinkStreamManager::HandleRemovedClient(const std::string &id)
+void SinkStreamManager::HandleStreamInfo(const std::string &discovery_id,
+ const std::vector<uint8_t> &message)
{
- if (id.compare(stream_.GetPeerId()) == 0) {
- stream_.Destroy();
- stream_.Create(false, false);
- stream_.Start();
+ if (!WebRtcMessage::IsValidStreamInfo(message)) {
+ DBG("Invalid streams info");
+ return;
}
- return;
+ // sink_streams have a stream at normal situation
+ auto src_streams = flexbuffers::GetRoot(message).AsVector();
+ for (size_t stream_idx = 0; stream_idx < src_streams.size(); ++stream_idx) {
+ auto stream = src_streams[stream_idx].AsMap();
+ auto id = stream["id"].AsString().str();
+ auto peer_id = stream["peer_id"].AsString().str();
+ auto sdp = stream["sdp"].AsString().str();
+ std::vector<std::string> ice_candidates;
+ auto ice_info = stream["ice_candidates"].AsVector();
+ for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
+ ice_candidates.push_back(ice_info[ice_idx].AsString().str());
+ UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
+ }
}
-void SinkStreamManager::HandleDiscoveredStream(const std::string &id,
- const std::vector<uint8_t> &message)
+void SinkStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
+ const std::string &peer_id, const std::string &sdp,
+ const std::vector<std::string> &ice_candidates)
{
- auto info = WebRtcMessage::ParseDiscoveryMessage(message);
- if (!info.is_src_ || info.topic_.compare(topic_)) {
- DBG("Is sink or topic not matched");
+ auto src_stream = src_stream_.find(discovery_id);
+ if (src_stream == src_stream_.end()) {
+ DBG("No matching stream");
+ return;
+ }
+
+ if (peer_id.compare(src_stream->second->GetStreamId()) != 0) {
+ DBG("Different ID");
return;
}
- // Sink'll update offer if WebRTC state is ice candidate compelete
- // Src create answer after it receives the offer.
- // So, We don't need to worry about state.
- stream_.AddDiscoveryInformation(message);
+ if (src_stream->second->GetPeerId().size() == 0) {
+ DBG("first update");
+ src_stream->second->SetPeerId(id);
+ src_stream->second->AddPeerInformation(sdp, ice_candidates);
+ } else
+ src_stream->second->UpdatePeerInformation(ice_candidates);
+
+}
+
+std::vector<uint8_t> SinkStreamManager::GetDiscoveryMessage(void)
+{
+ std::vector<uint8_t> message;
+
+ flexbuffers::Builder fbb;
+ fbb.Vector([&] {
+ for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr) {
+ fbb.Map([&] {
+ fbb.String("id", itr->second->GetStreamId());
+ fbb.String("peer_id", itr->second->GetPeerId());
+ fbb.String("sdp", itr->second->GetLocalDescription());
+ fbb.Vector("ice_candidates", [&]() {
+ for (const auto &candidate : itr->second->GetIceCandidates()) {
+ fbb.String(candidate);
+ }
+ });
+ });
+ }
+ });
+ fbb.Finish();
+
+ message = fbb.GetBuffer();
+ return message;
+}
+
+std::string SinkStreamManager::GetWatchingTopic(void)
+{
+ return watching_topic_;
}
} // namespace AittWebRTCNamespace
#pragma once
+#include <map>
+
#include "StreamManager.h"
namespace AittWebRTCNamespace {
using EncodedFrameCallabck = std::function<void(WebRtcStream &stream)>;
explicit SinkStreamManager(const std::string &topic, const std::string &aitt_id,
const std::string &thread_id);
- ~SinkStreamManager();
+ virtual ~SinkStreamManager();
void Start(void) override;
void Stop(void) override;
- void HandleRemovedClient(const std::string &id) override;
- void HandleDiscoveredStream(const std::string &id, const std::vector<uint8_t> &message) override;
+ void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) override;
void SetStreamReadyCallback(StreamReadyCallback cb) override;
+ void SetStreamStartCallback(StreamStartCallback cb) override;
+ void SetStreamStopCallback(StreamStopCallback cb) override;
+ std::vector<uint8_t> GetDiscoveryMessage(void) override;
+ std::string GetWatchingTopic(void) override;
// TODO: WebRTC CAPI doesn't allow destroy webrtc handle at callback.
// We need to avoid that situation
void SetOnEncodedFrameCallback(EncodedFrameCallabck cb);
- WebRtcStream &GetStream(void) { return stream_; };
+ void HandleRemovedClient(const std::string &discovery_id) override;
+ void HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message) override;
private:
void SetWebRtcStreamCallbacks(WebRtcStream &stream) override;
- static void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
- static void OnOfferCreated(std::string sdp, WebRtcStream &stream);
- static void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream,
- SinkStreamManager *manager);
- static void OnEncodedFrame(WebRtcStream &stream, SinkStreamManager *manager);
+ void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
+ void OnOfferCreated(std::string sdp, WebRtcStream &stream);
+ void OnIceCandidate(const std::string &candidate, WebRtcStream &stream);
+ void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream);
+ void OnEncodedFrame(WebRtcStream &stream);
+ void HandleStreamState(const std::string &discovery_id, const std::vector<uint8_t> &message);
+ void HandleStartStream(const std::string &discovery_id);
+ void HandleStreamInfo(const std::string &discovery_id, const std::vector<uint8_t> &message);
+ void AddStream(const std::string &discovery_id);
+ void UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
+ const std::string &peer_id, const std::string &sdp,
+ const std::vector<std::string> &ice_candidates);
+
+ std::string watching_topic_;
// TODO: What if user copies the module?
// Think about that case with destructor
- WebRtcStream stream_;
+ std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> src_stream_;
+ IceCandidateAddedCallback ice_candidate_added_cb_;
StreamReadyCallback stream_ready_cb_;
+ StreamStartCallback stream_start_cb_;
+ StreamStopCallback stream_stop_cb_;
EncodedFrameCallabck encoded_frame_cb_;
};
} // namespace AittWebRTCNamespace
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
#include "SrcStreamManager.h"
+#include <flatbuffers/flexbuffers.h>
+
#include <sstream>
#include "aitt_internal.h"
SrcStreamManager::SrcStreamManager(const std::string &topic, const std::string &aitt_id,
const std::string &thread_id)
- : StreamManager(topic, aitt_id, thread_id), stream_(nullptr), stream_ready_cb_(nullptr)
+ : StreamManager(topic + "/SRC", aitt_id, thread_id), watching_topic_(topic + "/SINK")
{
}
SrcStreamManager::~SrcStreamManager()
{
- Stop();
+ // TODO: You should take care about stream resource
+ for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr)
+ itr->second->Destroy();
+ sink_streams_.clear();
+}
+
+void SrcStreamManager::Start(void)
+{
+ DBG("%s %s", __func__, GetTopic().c_str());
+ if (stream_start_cb_)
+ stream_start_cb_();
+}
+
+void SrcStreamManager::Stop(void)
+{
+ DBG("%s %s", __func__, GetTopic().c_str());
+ // TODO: You should take care about stream resource
+ for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr)
+ itr->second->Destroy();
+ sink_streams_.clear();
+
+ if (stream_stop_cb_)
+ stream_stop_cb_();
+}
+
+void SrcStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb)
+{
+ ice_candidate_added_cb_ = cb;
}
void SrcStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
stream_ready_cb_ = cb;
}
+void SrcStreamManager::SetStreamStartCallback(StreamStartCallback cb)
+{
+ stream_start_cb_ = cb;
+}
+
+void SrcStreamManager::SetStreamStopCallback(StreamStopCallback cb)
+{
+ stream_stop_cb_ = cb;
+}
+
void SrcStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
{
- auto on_stream_state_changed_cb =
- std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream));
+ auto on_stream_state_changed_cb = std::bind(&SrcStreamManager::OnStreamStateChanged, this,
+ std::placeholders::_1, std::ref(stream));
stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
- auto on_signaling_state_notify_cb =
- std::bind(OnSignalingStateNotify, std::placeholders::_1, std::ref(stream));
+ auto on_ice_candidate_added_cb = std::bind(&SrcStreamManager::OnIceCandidate, this,
+ std::placeholders::_1, std::ref(stream));
+ stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb);
+
+ auto on_signaling_state_notify_cb = std::bind(&SrcStreamManager::OnSignalingStateNotify, this,
+ std::placeholders::_1, std::ref(stream));
stream.GetEventHandler().SetOnSignalingStateNotifyCb(on_signaling_state_notify_cb);
- auto on_ice_gathering_state_changed_cb =
- std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this);
+ auto on_ice_gathering_state_changed_cb = std::bind(&SrcStreamManager::OnIceGatheringStateNotify,
+ this, std::placeholders::_1, std::ref(stream));
stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
}
DBG("OnSrcStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
}
+void SrcStreamManager::OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream)
+{
+ DBG("OnSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
+ if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
+ auto on_answer_created = std::bind(&SrcStreamManager::OnAnswerCreated, this,
+ std::placeholders::_1, std::ref(stream));
+ stream.CreateAnswerAsync(on_answer_created);
+ }
+}
+
void SrcStreamManager::OnAnswerCreated(std::string sdp, WebRtcStream &stream)
{
DBG("%s", __func__);
stream.SetLocalDescription(sdp);
}
-void SrcStreamManager::OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream)
+void SrcStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream)
{
- DBG("OnSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
- if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
- auto on_answer_created =
- std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream));
- stream.CreateAnswerAsync(on_answer_created);
- }
+ if (ice_candidate_added_cb_)
+ ice_candidate_added_cb_(stream);
}
void SrcStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state,
- WebRtcStream &stream, SrcStreamManager *manager)
+ WebRtcStream &stream)
{
DBG("Src IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
if (state == WebRtcState::IceGathering::COMPLETE) {
- if (manager && manager->stream_ready_cb_)
- manager->stream_ready_cb_(stream);
+ if (stream_ready_cb_)
+ stream_ready_cb_(stream);
}
}
-void SrcStreamManager::Start(void)
+void SrcStreamManager::HandleRemovedClient(const std::string &discovery_id)
{
- // TODO: What'll be done in start Src Stream Manager?
-}
-
-void SrcStreamManager::AddStream(const std::string &id, const std::vector<uint8_t> &message)
-{
- // TODO Add more streams on same topic
- if (stream_)
+ auto sink_stream_itr = sink_streams_.find(discovery_id);
+ if (sink_stream_itr == sink_streams_.end()) {
+ DBG("There's no sink stream %s", discovery_id.c_str());
return;
+ }
- stream_ = new WebRtcStream();
- SetWebRtcStreamCallbacks(*stream_);
- stream_->Create(true, false);
- stream_->AttachCameraSource();
- stream_->Start();
-
- std::stringstream s_stream;
- s_stream << static_cast<void *>(&stream_);
-
- stream_->SetStreamId(std::string(aitt_id_ + thread_id_ + s_stream.str()));
- stream_->SetPeerId(id);
- stream_->AddDiscoveryInformation(message);
+ // TODO: You should take care about stream resource
+ sink_stream_itr->second->Destroy();
+ sink_streams_.erase(sink_stream_itr);
return;
}
-void SrcStreamManager::Stop(void)
+void SrcStreamManager::HandleMsg(const std::string &discovery_id,
+ const std::vector<uint8_t> &message)
{
- // TODO: Ad-hoc method
- if (stream_) {
- delete stream_;
- stream_ = nullptr;
- }
+ if (flexbuffers::GetRoot(message).IsString())
+ HandleStreamState(discovery_id, message);
+ else if (flexbuffers::GetRoot(message).IsVector())
+ HandleStreamInfo(discovery_id, message);
}
-void SrcStreamManager::HandleRemovedClient(const std::string &id)
+void SrcStreamManager::HandleStreamState(const std::string &discovery_id,
+ const std::vector<uint8_t> &message)
{
- if (stream_ && stream_->GetPeerId().compare(id) == 0) {
- delete stream_;
- stream_ = nullptr;
- }
+ auto sink_state = flexbuffers::GetRoot(message).ToString();
+
+ if (sink_state.compare("STOP") == 0)
+ HandleRemovedClient(discovery_id);
+ else
+ DBG("Invalid message %s", sink_state);
}
-void SrcStreamManager::HandleDiscoveredStream(const std::string &id,
+void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id,
const std::vector<uint8_t> &message)
{
- auto info = WebRtcMessage::ParseDiscoveryMessage(message);
- if (info.is_src_ || info.topic_.compare(topic_)) {
- DBG("Is src or topic not matched");
+ if (!WebRtcMessage::IsValidStreamInfo(message)) {
+ DBG("Invalid streams info");
return;
}
- if (stream_) {
- DBG("Supports one stream at once currently");
- return;
+ // sink_streams have a stream at normal situation
+ auto sink_streams = flexbuffers::GetRoot(message).AsVector();
+ for (size_t stream_idx = 0; stream_idx < sink_streams.size(); ++stream_idx) {
+ auto stream = sink_streams[stream_idx].AsMap();
+ auto id = stream["id"].AsString().str();
+ auto peer_id = stream["peer_id"].AsString().str();
+ auto sdp = stream["sdp"].AsString().str();
+ std::vector<std::string> ice_candidates;
+ auto ice_info = stream["ice_candidates"].AsVector();
+ for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
+ ice_candidates.push_back(ice_info[ice_idx].AsString().str());
+ UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
}
+}
+
+void SrcStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
+ const std::string &peer_id, const std::string &sdp,
+ const std::vector<std::string> &ice_candidates)
+{
+ auto sink_stream = sink_streams_.find(discovery_id);
+ if (sink_stream == sink_streams_.end())
+ AddStream(discovery_id, id, sdp, ice_candidates);
+ else
+ sink_stream->second->UpdatePeerInformation(ice_candidates);
+}
- AddStream(id, message);
+void SrcStreamManager::AddStream(const std::string &discovery_id, const std::string &id,
+ const std::string &sdp, const std::vector<std::string> &ice_candidates)
+{
+ auto stream = new WebRtcStream();
+ SetWebRtcStreamCallbacks(*stream);
+ stream->Create(true, false);
+ stream->AttachCameraSource();
+ stream->Start();
+
+ std::stringstream s_stream;
+ s_stream << static_cast<void *>(stream);
+
+ stream->SetStreamId(std::string(thread_id_ + s_stream.str()));
+ stream->SetPeerId(id);
+ stream->AddPeerInformation(sdp, ice_candidates);
+
+ sink_streams_[discovery_id] = stream;
+
+ return;
+}
+
+std::vector<uint8_t> SrcStreamManager::GetDiscoveryMessage(void)
+{
+ std::vector<uint8_t> message;
+
+ flexbuffers::Builder fbb;
+ fbb.Vector([&] {
+ for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr) {
+ fbb.Map([&] {
+ fbb.String("id", itr->second->GetStreamId());
+ fbb.String("peer_id", itr->second->GetPeerId());
+ fbb.String("sdp", itr->second->GetLocalDescription());
+ fbb.Vector("ice_candidates", [&]() {
+ for (const auto &candidate : itr->second->GetIceCandidates()) {
+ fbb.String(candidate);
+ }
+ });
+ });
+ }
+ });
+ fbb.Finish();
+
+ message = fbb.GetBuffer();
+ return message;
+}
+
+std::string SrcStreamManager::GetWatchingTopic(void)
+{
+ return watching_topic_;
}
} // namespace AittWebRTCNamespace
#pragma once
+#include <map>
+
#include "StreamManager.h"
namespace AittWebRTCNamespace {
public:
explicit SrcStreamManager(const std::string &topic, const std::string &aitt_id,
const std::string &thread_id);
- ~SrcStreamManager();
+ virtual ~SrcStreamManager();
void Start(void) override;
- void SetStreamReadyCallback(StreamReadyCallback cb) override;
- //TODO: What's the best way to shutdown all?
+ // TODO: What's the best way to shutdown all?
void Stop(void) override;
- void HandleRemovedClient(const std::string &id) override;
- void HandleDiscoveredStream(const std::string &id, const std::vector<uint8_t> &message) override;
+ void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) override;
+ void SetStreamReadyCallback(StreamReadyCallback cb) override;
+ void SetStreamStartCallback(StreamStartCallback cb) override;
+ void SetStreamStopCallback(StreamStopCallback cb) override;
+ std::vector<uint8_t> GetDiscoveryMessage(void) override;
+ std::string GetWatchingTopic(void) override;
+ void HandleRemovedClient(const std::string &discovery_id) override;
+ void HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message) override;
private:
void SetWebRtcStreamCallbacks(WebRtcStream &stream) override;
- void AddStream(const std::string &id, const std::vector<uint8_t> &message);
- static void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
- static void OnAnswerCreated(std::string sdp, WebRtcStream &stream);
- static void OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream);
- static void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream,
- SrcStreamManager *manager);
+ void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
+ void OnAnswerCreated(std::string sdp, WebRtcStream &stream);
+ void OnIceCandidate(const std::string &candidate, WebRtcStream &stream);
+ void OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream);
+ void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream);
+ void HandleStreamState(const std::string &discovery_id, const std::vector<uint8_t> &message);
+ void HandleStreamInfo(const std::string &discovery_id, const std::vector<uint8_t> &message);
+ void AddStream(const std::string &discovery_id, const std::string &id, const std::string &sdp,
+ const std::vector<std::string> &ice_candidates);
+ void UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
+ const std::string &peer_id, const std::string &sdp,
+ const std::vector<std::string> &ice_candidates);
+
+ std::string watching_topic_;
// TODO: What if user copies the module?
// Think about that case with destructor
- WebRtcStream *stream_;
+ std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> sink_streams_;
+ IceCandidateAddedCallback ice_candidate_added_cb_;
StreamReadyCallback stream_ready_cb_;
+ StreamStartCallback stream_start_cb_;
+ StreamStopCallback stream_stop_cb_;
};
} // namespace AittWebRTCNamespace
#include <functional>
#include <string>
#include <thread>
+#include <vector>
namespace AittWebRTCNamespace {
class StreamManager {
public:
+ using IceCandidateAddedCallback = std::function<void(WebRtcStream &stream)>;
using StreamReadyCallback = std::function<void(WebRtcStream &stream)>;
+ using StreamStartCallback = std::function<void(void)>;
+ using StreamStopCallback = std::function<void(void)>;
explicit StreamManager(const std::string &topic, const std::string &aitt_id,
const std::string &thread_id)
: topic_(topic), aitt_id_(aitt_id), thread_id_(thread_id){};
virtual ~StreamManager() = default;
std::string GetTopic(void) const { return topic_; };
std::string GetClientId(void) const { return aitt_id_; };
- virtual void HandleRemovedClient(const std::string &id) = 0;
- virtual void HandleDiscoveredStream(const std::string &id,
+ virtual void HandleRemovedClient(const std::string &discovery_id) = 0;
+ virtual void HandleMsg(const std::string &discovery_id,
const std::vector<uint8_t> &message) = 0;
+
virtual void Start(void) = 0;
virtual void Stop(void) = 0;
+ virtual void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) = 0;
virtual void SetStreamReadyCallback(StreamReadyCallback cb) = 0;
+ virtual void SetStreamStartCallback(StreamStartCallback cb) = 0;
+ virtual void SetStreamStopCallback(StreamStopCallback cb) = 0;
+ virtual std::vector<uint8_t> GetDiscoveryMessage(void) = 0;
+ virtual std::string GetWatchingTopic(void) = 0;
protected:
std::string topic_;
};
void UnsetOnSignalingStateNotifyCb(void) { on_signaling_state_notify_cb_ = nullptr; };
+ void SetOnIceCandidateCb(
+ std::function<void(std::string)> on_ice_candidate_cb)
+ {
+ on_ice_candidate_cb_ = on_ice_candidate_cb;
+ };
+ void CallOnIceCandidateCb(std::string candidate) const
+ {
+ if (on_ice_candidate_cb_)
+ on_ice_candidate_cb_(candidate);
+ };
+ void UnsetOnIceCandidateCb(void) { on_ice_candidate_cb_ = nullptr; };
+
void SetOnIceGatheringStateNotifyCb(
std::function<void(WebRtcState::IceGathering)> on_ice_gathering_state_notify_cb)
{
std::function<void(void)> on_negotiation_needed_cb_;
std::function<void(WebRtcState::Stream)> on_state_changed_cb_;
std::function<void(WebRtcState::Signaling)> on_signaling_state_notify_cb_;
+ std::function<void(std::string)> on_ice_candidate_cb_;
std::function<void(WebRtcState::IceGathering)> on_ice_gathering_state_notify_cb_;
std::function<void(WebRtcState::IceConnection)> on_ice_connection_state_notify_cb_;
std::function<void(void)> on_encoded_frame_cb_;
#include "WebRtcMessage.h"
-#include <flatbuffers/flexbuffers.h>
#include <json-glib/json-glib.h>
#include "aitt_internal.h"
return type;
}
-std::vector<uint8_t> WebRtcMessage::GenerateDiscoveryMessage(const std::string &topic, bool is_src,
- const std::string &sdp, const std::vector<std::string> &ice_candidates)
+bool WebRtcMessage::IsValidStreamInfo(const std::vector<uint8_t> &message)
{
- std::vector<uint8_t> message;
-
- flexbuffers::Builder fbb;
- fbb.Map([=, &fbb]() {
- fbb.String("topic", topic);
- fbb.Bool("is_src", is_src);
- fbb.String("sdp", sdp);
- fbb.Vector("ice_candidates", [&]() {
- for (const auto &candidate : ice_candidates) {
- fbb.String(candidate);
- }
- });
- });
- fbb.Finish();
-
- message = fbb.GetBuffer();
-
- return message;
-}
-
-bool WebRtcMessage::IsValidDiscoveryMessage(const std::vector<uint8_t> &discovery_message)
-{
- if (!flexbuffers::GetRoot(discovery_message).IsMap())
+ if (!flexbuffers::GetRoot(message).IsVector())
return false;
- auto discovery_info = flexbuffers::GetRoot(discovery_message).AsMap();
- bool topic_exists{discovery_info["topic"].IsString()};
- bool is_source_exists{discovery_info["is_src"].IsBool()};
- bool sdp_exists{discovery_info["sdp"].IsString()};
- bool ice_candidates_exists{discovery_info["ice_candidates"].IsVector()};
+ auto streams = flexbuffers::GetRoot(message).AsVector();
+ for (size_t idx = 0; idx < streams.size(); ++idx) {
+ auto stream = streams[idx].AsMap();
+ if (!IsValidStream(stream))
+ return false;
+ }
- return topic_exists && is_source_exists && sdp_exists && ice_candidates_exists;
+ return true;
}
-WebRtcMessage::DiscoveryInfo WebRtcMessage::ParseDiscoveryMessage(
- const std::vector<uint8_t> &discovery_message)
+bool WebRtcMessage::IsValidStream(const flexbuffers::Map &info)
{
- WebRtcMessage::DiscoveryInfo info;
- if (!IsValidDiscoveryMessage(discovery_message)) {
- DBG("Invalid info");
- return info;
- }
-
- auto discovery_info_map = flexbuffers::GetRoot(discovery_message).AsMap();
- info.topic_ = discovery_info_map["topic"].AsString().str();
- info.is_src_ = discovery_info_map["is_src"].AsBool();
- info.sdp_ = discovery_info_map["sdp"].AsString().str();
- auto ice_candidates_info = discovery_info_map["ice_candidates"].AsVector();
- for (size_t idx = 0; idx < ice_candidates_info.size(); ++idx) {
- info.ice_candidates_.push_back(ice_candidates_info[idx].AsString().str());
- }
+ bool id_exist{info["id"].IsString()};
+ bool peer_id_exist{info["peer_id"].IsString()};
+ bool sdp_exist{info["sdp"].IsString()};
+ bool ice_candidates_exist{info["ice_candidates"].IsVector()};
- return info;
+ return id_exist && peer_id_exist && sdp_exist && ice_candidates_exist;
}
} // namespace AittWebRTCNamespace
#pragma once
+#include <flatbuffers/flexbuffers.h>
+
#include <string>
#include <vector>
class DiscoveryInfo {
public:
DiscoveryInfo() = default;
- DiscoveryInfo(const std::string &topic, bool is_src, const std::string &sdp,
+ DiscoveryInfo(const std::string &id, const std::string &peer_id, const std::string &sdp,
const std::vector<std::string> &ice_candidates)
- : topic_(topic), is_src_(is_src), sdp_(sdp), ice_candidates_(ice_candidates)
+ : id_(id), peer_id_(peer_id), sdp_(sdp), ice_candidates_(ice_candidates)
{
}
- std::string topic_;
- bool is_src_;
+ std::string id_;
+ std::string peer_id_;
std::string sdp_;
std::vector<std::string> ice_candidates_;
};
static WebRtcMessage::Type getMessageType(const std::string &message);
- static std::vector<uint8_t> GenerateDiscoveryMessage(const std::string &topic, bool is_src,
- const std::string &sdp, const std::vector<std::string> &ice_candidates);
- static bool IsValidDiscoveryMessage(const std::vector<uint8_t> &discovery_message);
- static DiscoveryInfo ParseDiscoveryMessage(const std::vector<uint8_t> &discovery_message);
+ static bool IsValidStreamInfo(const std::vector<uint8_t> &message);
+ static bool IsValidStream(const flexbuffers::Map &info);
static constexpr int DISCOVERY_MESSAGE_KEY_SIZE = 4;
};
if (ret != WEBRTC_ERROR_NONE)
ERR("Failed to destroy webrtc handle");
webrtc_handle_ = nullptr;
+
+ // TODO what should be initialized?
}
bool WebRtcStream::Start(void)
if (ret != WEBRTC_ERROR_NONE)
ERR("Failed to stop webrtc handle");
+ // TODO what should be initialized?
return ret == WEBRTC_ERROR_NONE;
}
return ret == WEBRTC_ERROR_NONE;
}
-bool WebRtcStream::AddDiscoveryInformation(const std::vector<uint8_t> &discovery_message)
+bool WebRtcStream::AddPeerInformation(const std::string &sdp,
+ const std::vector<std::string> &ice_candidates)
{
- ERR("%s", __func__);
-
- peer_info_ = WebRtcMessage::ParseDiscoveryMessage(discovery_message);
+ remote_description_ = sdp;
+ peer_ice_candidates_ = ice_candidates;
if (!IsNegotiatingState()) {
+ DBG("Not negotiable");
return false;
}
-
return SetPeerInformation();
}
+void WebRtcStream::UpdatePeerInformation(const std::vector<std::string> &ice_candidates)
+{
+ if (IsPlayingState()) {
+ remote_description_.clear();
+ peer_ice_candidates_.clear();
+ stored_peer_ice_candidates_.clear();
+ DBG("Now Playing");
+ return;
+ }
+
+ peer_ice_candidates_ = ice_candidates;
+ if (!IsNegotiatingState()) {
+ DBG("Not negotiable");
+ return;
+ }
+
+ SetPeerInformation();
+ return;
+}
+
bool WebRtcStream::IsNegotiatingState(void)
{
webrtc_state_e state;
return state == WEBRTC_STATE_NEGOTIATING;
}
+bool WebRtcStream::IsPlayingState(void)
+{
+ webrtc_state_e state;
+ auto get_state_ret = webrtc_get_state(webrtc_handle_, &state);
+ if (get_state_ret != WEBRTC_ERROR_NONE) {
+ ERR("Failed to get state");
+ return false;
+ }
+
+ return state == WEBRTC_STATE_PLAYING;
+}
+
bool WebRtcStream::SetPeerInformation(void)
{
bool res = true;
}
bool WebRtcStream::SetPeerSDP(void)
{
+ ERR("%s", __func__);
bool res = true;
- if (peer_info_.sdp_.size() == 0)
+ if (remote_description_.size() == 0) {
+ DBG("Peer SDP empty");
return res;
+ }
- if (SetRemoteDescription(peer_info_.sdp_))
- peer_info_.sdp_.clear();
+ if (SetRemoteDescription(remote_description_))
+ remote_description_.clear();
else
res = false;
bool WebRtcStream::SetPeerIceCandidates(void)
{
- bool res = true;
- for (auto it = peer_info_.ice_candidates_.begin(); it != peer_info_.ice_candidates_.end();) {
- if (AddIceCandidateFromMessage(*it))
- it = peer_info_.ice_candidates_.erase(it);
+ ERR("%s", __func__);
+ bool res{true};
+ for (auto itr = peer_ice_candidates_.begin(); itr != peer_ice_candidates_.end(); ++itr) {
+ if (IsRedundantCandidate(*itr))
+ continue;
+
+ if (AddIceCandidateFromMessage(*itr))
+ stored_peer_ice_candidates_.push_back(*itr);
else
res = false;
}
return res;
}
+bool WebRtcStream::IsRedundantCandidate(const std::string &candidate)
+{
+ for (const auto &stored_ice_candidate : stored_peer_ice_candidates_) {
+ if (stored_ice_candidate.compare(candidate) == 0)
+ return true;
+ }
+ return false;
+}
void WebRtcStream::AttachSignals(bool is_source, bool need_display)
{
ERR("%s", __func__);
auto webrtc_stream = static_cast<WebRtcStream *>(user_data);
webrtc_stream->ice_candidates_.push_back(candidate);
+ webrtc_stream->GetEventHandler().CallOnIceCandidateCb(candidate);
}
void WebRtcStream::OnEncodedFrame(webrtc_h webrtc, webrtc_media_type_e type, unsigned int track_id,
std::string &GetPeerId(void) { return peer_id_; };
bool AddIceCandidateFromMessage(const std::string &ice_message);
- bool AddDiscoveryInformation(const std::vector<uint8_t> &discovery_message);
+ bool AddPeerInformation(const std::string &sdp, const std::vector<std::string> &ice_candidates);
+ void UpdatePeerInformation(const std::vector<std::string> &ice_candidates);
bool SetPeerInformation(void);
bool SetPeerSDP(void);
bool SetPeerIceCandidates(void);
void *user_data);
static void OnDataChannelOpen(webrtc_data_channel_h channel, void *user_data);
bool IsNegotiatingState(void);
+ bool IsPlayingState(void);
+ bool IsRedundantCandidate(const std::string &candidate);
private:
webrtc_h webrtc_handle_;
webrtc_data_channel_h channel_;
unsigned int source_id_;
std::string local_description_;
+ std::string remote_description_;
std::string id_;
std::string peer_id_;
std::vector<std::string> ice_candidates_;
+ std::vector<std::string> peer_ice_candidates_;
+ std::vector<std::string> stored_peer_ice_candidates_;
std::function<void(std::string)> on_offer_created_cb_;
std::function<void(std::string)> on_answer_created_cb_;
WebRtcEventHandler event_handler_;
- WebRtcMessage::DiscoveryInfo peer_info_;
};
} // namespace AittWebRTCNamespace
*/
#include <AittDiscovery.h>
+#include <flatbuffers/flexbuffers.h>
#include <glib.h>
#include <gtest/gtest.h>
#define LOCAL_IP "127.0.0.1"
#define DEFAULT_BROKER_PORT 1883
#define WEBRTC_TOPIC_PREFIX "WEBRTC_"
+#define WEBRTC_SRC_TOPIC "/src"
+#define WEBRTC_SINK_TOPIC "/sink"
#define TEST_TOPIC "TEST_TOPIC"
#define TEST_SRC_CLIENT_ID "TEST_SRC_CLIENT_ID"
#define TEST_SINK_CLIENT_ID "TEST_SINK_CLIENT_ID"
{
// TODO
}
-class WebRtcSrcStreamTest : public testing::Test {
+class WebRtcStreamTest : public testing::Test {
protected:
void SetUp() override { mainLoop_ = g_main_loop_new(nullptr, FALSE); }
void TearDown() override { g_main_loop_unref(mainLoop_); }
g_main_loop_run(mainLoop_);
DBG("Go forward");
}
+
static void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream,
- WebRtcSrcStreamTest *test)
+ WebRtcStreamTest *test)
{
DBG("OnStreamStateChanged");
if (state == WebRtcState::Stream::NEGOTIATING) {
stream.CreateOfferAsync(on_offer_created);
}
}
- static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcSrcStreamTest *test)
+ static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcStreamTest *test)
{
DBG("%s", __func__);
stream.SetLocalDescription(sdp);
}
- static void OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream,
- WebRtcSrcStreamTest *test)
- {
- DBG("Singaling State: %s", WebRtcState::SignalingToStr(state).c_str());
- }
-
static void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream,
- WebRtcSrcStreamTest *test)
+ WebRtcStreamTest *test)
{
DBG("IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
g_main_loop_quit(test->mainLoop_);
std::string local_description_;
};
-TEST_F(WebRtcSrcStreamTest, test_Create_WebRtcSrcStream_OnDevice)
-{
- WebRtcStream stream{};
- EXPECT_EQ(true, stream.Create(true, false)) << "Failed to create source stream";
-}
-
-TEST_F(WebRtcSrcStreamTest, test_Start_WebRtcSrcStream_OnDevice)
+TEST_F(WebRtcStreamTest, test_Create_WebRtcStream_OnDevice)
{
- WebRtcStream stream{};
- EXPECT_EQ(true, stream.Create(true, false)) << "Failed to create source stream";
- EXPECT_EQ(true, stream.AttachCameraSource()) << "Failed to attach camera source";
- auto on_stream_state_changed_cb =
- std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream), this);
- stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
-
- auto on_signaling_state_changed_cb =
- std::bind(OnSignalingStateNotify, std::placeholders::_1, std::ref(stream), this);
- stream.GetEventHandler().SetOnSignalingStateNotifyCb(on_signaling_state_changed_cb);
-
- auto on_ice_gathering_state_changed_cb =
- std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this);
- stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
- stream.Start();
- IterateEventLoop();
+ WebRtcStream src_stream{};
+ EXPECT_EQ(true, src_stream.Create(true, false)) << "Failed to create source stream";
}
-TEST_F(WebRtcSrcStreamTest, test_Validate_WebRtcSrcStream_Discovery_Message_OnDevice)
+TEST_F(WebRtcStreamTest, test_Start_WebRtcSrcStream_OnDevice)
{
WebRtcStream stream{};
EXPECT_EQ(true, stream.Create(true, false)) << "Failed to create source stream";
std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream), this);
stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
- auto on_signaling_state_changed_cb =
- std::bind(OnSignalingStateNotify, std::placeholders::_1, std::ref(stream), this);
- stream.GetEventHandler().SetOnSignalingStateNotifyCb(on_signaling_state_changed_cb);
-
auto on_ice_gathering_state_changed_cb =
std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this);
stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
for (const auto &ice_candidate : ice_candidates) {
EXPECT_EQ(WebRtcMessage::Type::ICE, WebRtcMessage::getMessageType(ice_candidate));
}
-
- auto discovery_message = WebRtcMessage::GenerateDiscoveryMessage(TEST_TOPIC, true,
- local_description_, stream.GetIceCandidates());
-
- auto discovery_info = WebRtcMessage::ParseDiscoveryMessage(discovery_message);
-
- EXPECT_EQ(0, discovery_info.topic_.compare(TEST_TOPIC));
- EXPECT_EQ(true, discovery_info.is_src_);
- EXPECT_EQ(0, discovery_info.sdp_.compare(local_description_));
- for (const auto ice_candidate : ice_candidates) {
- bool is_ice_candidate_exists{false};
-
- for (const auto &discovered_ice_candidate : discovery_info.ice_candidates_) {
- if (discovered_ice_candidate.compare(ice_candidate) == 0)
- is_ice_candidate_exists = true;
- }
- EXPECT_EQ(true, is_ice_candidate_exists);
- }
}
class WebRtcSourceOffererTest : public testing::Test {
{
DBG("Source IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
if (state == WebRtcState::IceGathering::COMPLETE) {
- test->sink_stream_.AddDiscoveryInformation(WebRtcMessage::GenerateDiscoveryMessage(
- "TEST_TOPIC", true, test->src_description_, stream.GetIceCandidates()));
+ test->sink_stream_.AddPeerInformation(test->src_description_,
+ stream.GetIceCandidates());
}
}
{
DBG("Sink IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
if (WebRtcState::IceGathering::COMPLETE == state) {
- test->src_stream_.AddDiscoveryInformation(WebRtcMessage::GenerateDiscoveryMessage(
- "TEST_TOPIC", true, test->sink_description_, stream.GetIceCandidates()));
+ test->src_stream_.AddPeerInformation(test->sink_description_,
+ stream.GetIceCandidates());
}
}
{
DBG("Sink IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
if (state == WebRtcState::IceGathering::COMPLETE) {
- test->src_stream_.AddDiscoveryInformation(WebRtcMessage::GenerateDiscoveryMessage(
- "TEST_TOPIC", true, test->sink_description_, stream.GetIceCandidates()));
+ test->src_stream_.AddPeerInformation(test->sink_description_,
+ stream.GetIceCandidates());
}
}
{
DBG("Src IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
if (WebRtcState::IceGathering::COMPLETE == state) {
- test->sink_stream_.AddDiscoveryInformation(WebRtcMessage::GenerateDiscoveryMessage(
- "TEST_TOPIC", true, test->src_description_, stream.GetIceCandidates()));
+ test->sink_stream_.AddPeerInformation(test->src_description_,
+ stream.GetIceCandidates());
}
}
new SrcStreamManager(TEST_TOPIC, TEST_SRC_CLIENT_ID, s_stream.str()));
}
+class SrcStreamManagerTest : public testing::Test {
+ protected:
+ SrcStreamManagerTest()
+ : discovery_cb_(-1),
+ test_topic_(std::string(WEBRTC_TOPIC_PREFIX) + std::string(TEST_TOPIC)),
+ discovery_engine_(TEST_SRC_CLIENT_ID),
+ mainLoop_(nullptr){};
+ void SetUp() override
+ {
+ mainLoop_ = g_main_loop_new(nullptr, FALSE);
+ discovery_engine_.SetMQ(std::unique_ptr<aitt::MQ>(
+ new aitt::MosquittoMQ(std::string(TEST_SRC_CLIENT_ID) + 'd', false)));
+ discovery_engine_.Start(LOCAL_IP, DEFAULT_BROKER_PORT, std::string(), std::string());
+ }
+
+ void TearDown() override
+ {
+ discovery_engine_.SetMQ(nullptr);
+ g_main_loop_unref(mainLoop_);
+ }
+ void IterateEventLoop(void)
+ {
+ g_main_loop_run(mainLoop_);
+ DBG("Go forward");
+ }
+
+ static void OnDiscoveredSrc(const std::string &clientId, const std::string &status,
+ const void *msg, const int szmsg, SrcStreamManagerTest *test,
+ SrcStreamManager *src_manager)
+ {
+ DBG("OnDiscoveredSrc");
+
+ auto discovered_msg = std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
+ static_cast<const uint8_t *>(msg) + szmsg);
+ if (!flexbuffers::GetRoot(discovered_msg).IsString()) {
+ DBG("Invalid message type");
+ return;
+ }
+
+ auto src_cmd = flexbuffers::GetRoot(discovered_msg).ToString();
+ if (src_cmd.compare("START") == 0) {
+ DBG("Start Received");
+ src_manager->Stop();
+ } else if (src_cmd.compare("STOP") == 0) {
+ DBG("Stop Received");
+ if (g_main_loop_is_running(test->mainLoop_))
+ g_main_loop_quit(test->mainLoop_);
+ } else {
+ DBG("Invalid message");
+ return;
+ }
+ }
+
+ static void OnStreamStarted(SrcStreamManagerTest *test, SrcStreamManager *src_manager)
+ {
+ std::vector<uint8_t> msg;
+
+ flexbuffers::Builder fbb;
+ fbb.String("START");
+ fbb.Finish();
+
+ msg = fbb.GetBuffer();
+ test->discovery_engine_.UpdateDiscoveryMsg(src_manager->GetTopic(), msg.data(), msg.size());
+ }
+
+ static void OnStreamStopped(SrcStreamManagerTest *test, SrcStreamManager *src_manager)
+ {
+ std::vector<uint8_t> msg;
+
+ flexbuffers::Builder fbb;
+ fbb.String("STOP");
+ fbb.Finish();
+
+ msg = fbb.GetBuffer();
+ test->discovery_engine_.UpdateDiscoveryMsg(src_manager->GetTopic(), msg.data(), msg.size());
+ }
+
+ int discovery_cb_;
+ std::string test_topic_;
+ aitt::AittDiscovery discovery_engine_;
+ GMainLoop *mainLoop_;
+};
+
+TEST_F(SrcStreamManagerTest, test_SrcStreamManager_Start_OnDevice)
+{
+ std::stringstream s_stream;
+ s_stream << std::this_thread::get_id();
+ std::unique_ptr<StreamManager> src_manager(
+ new SrcStreamManager(test_topic_, TEST_SRC_CLIENT_ID, s_stream.str()));
+
+ auto on_discovered = std::bind(OnDiscoveredSrc, std::placeholders::_1, std::placeholders::_2,
+ std::placeholders::_3, std::placeholders::_4, this,
+ static_cast<SrcStreamManager *>(src_manager.get()));
+
+ // Src should subscribe WEBRTC_SINK_TOPIC but this is for test
+ discovery_cb_ = discovery_engine_.AddDiscoveryCB(src_manager->GetTopic(), on_discovered);
+
+ auto on_stream_started =
+ std::bind(OnStreamStarted, this, static_cast<SrcStreamManager *>(src_manager.get()));
+ src_manager->SetStreamStartCallback(on_stream_started);
+
+ auto on_stream_stopped =
+ std::bind(OnStreamStopped, this, static_cast<SrcStreamManager *>(src_manager.get()));
+ src_manager->SetStreamStopCallback(on_stream_stopped);
+
+ src_manager->Start();
+ IterateEventLoop();
+ discovery_engine_.RemoveDiscoveryCB(discovery_cb_);
+}
+
class SinkStreamManagerTest : public testing::Test {
protected:
SinkStreamManagerTest()
: discovery_cb_(-1),
test_topic_(std::string(WEBRTC_TOPIC_PREFIX) + std::string(TEST_TOPIC)),
discovery_engine_(TEST_SINK_CLIENT_ID),
- mainLoop_(nullptr) {};
+ mainLoop_(nullptr){};
void SetUp() override
{
mainLoop_ = g_main_loop_new(nullptr, FALSE);
DBG("Go forward");
}
- static void OnDiscovered(const std::string &clientId, const std::string &status,
+ static void OnDiscoveredSink(const std::string &clientId, const std::string &status,
const void *msg, const int szmsg, SinkStreamManagerTest *test)
{
- DBG("OnDiscovered");
- if (g_main_loop_is_running(test->mainLoop_))
+ DBG("OnDiscoveredSink");
+ if (WebRtcMessage::IsValidStreamInfo(std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
+ static_cast<const uint8_t *>(msg) + szmsg))
+ && g_main_loop_is_running(test->mainLoop_))
g_main_loop_quit(test->mainLoop_);
}
- static void OnStreamReady(WebRtcStream &stream, SinkStreamManagerTest *test)
+ static void OnIceCandidateAdded(WebRtcStream &stream, SinkStreamManager *sink_mgr,
+ SinkStreamManagerTest *test)
+ {
+ DBG("OnIceCandidateAdded");
+ auto discovery_message = sink_mgr->GetDiscoveryMessage();
+ test->discovery_engine_.UpdateDiscoveryMsg(sink_mgr->GetTopic(), discovery_message.data(),
+ discovery_message.size());
+ }
+
+ static void OnStreamReady(WebRtcStream &stream, SinkStreamManager *sink_mgr,
+ SinkStreamManagerTest *test)
{
DBG("OnStreamReady");
- auto discovery_message = WebRtcMessage::GenerateDiscoveryMessage(test->test_topic_, false,
- stream.GetLocalDescription(), stream.GetIceCandidates());
- test->discovery_engine_.UpdateDiscoveryMsg(test->test_topic_, discovery_message.data(),
+ auto discovery_message = sink_mgr->GetDiscoveryMessage();
+ test->discovery_engine_.UpdateDiscoveryMsg(sink_mgr->GetTopic(), discovery_message.data(),
discovery_message.size());
}
std::unique_ptr<StreamManager> sink_manager(
new SinkStreamManager(test_topic_, TEST_SINK_CLIENT_ID, s_stream.str()));
- auto on_discovered = std::bind(OnDiscovered, std::placeholders::_1, std::placeholders::_2,
+ auto on_discovered = std::bind(OnDiscoveredSink, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4, this);
- discovery_cb_ = discovery_engine_.AddDiscoveryCB(test_topic_, on_discovered);
+ // Sink should subscribe WEBRTC_SRC_TOPIC but this is for test
+ discovery_cb_ = discovery_engine_.AddDiscoveryCB(sink_manager->GetTopic(), on_discovered);
+
+ auto on_ice_candidate_added = std::bind(OnIceCandidateAdded, std::placeholders::_1,
+ static_cast<SinkStreamManager *>(sink_manager.get()), this);
+ sink_manager->SetIceCandidateAddedCallback(on_ice_candidate_added);
- auto on_stream_ready = std::bind(OnStreamReady, std::placeholders::_1, this);
+ auto on_stream_ready = std::bind(OnStreamReady, std::placeholders::_1,
+ static_cast<SinkStreamManager *>(sink_manager.get()), this);
sink_manager->SetStreamReadyCallback(on_stream_ready);
sink_manager->Start();
new aitt::MosquittoMQ(std::string(TEST_SINK_CLIENT_ID) + 'd', false)));
sink_discovery_engine_.Start(LOCAL_IP, DEFAULT_BROKER_PORT, std::string(), std::string());
- auto discovered_at_sink = std::bind(DiscoveredAtSink, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, this);
- sink_discovery_cb_ = sink_discovery_engine_.AddDiscoveryCB(test_topic_, discovered_at_sink);
- sink_discovery_engine_.Restart();
-
src_discovery_engine_.SetMQ(std::unique_ptr<aitt::MQ>(
new aitt::MosquittoMQ(std::string(TEST_SRC_CLIENT_ID) + 'd', false)));
src_discovery_engine_.Start(LOCAL_IP, DEFAULT_BROKER_PORT, std::string(), std::string());
+ }
- auto discovered_at_src = std::bind(DiscoveredAtSrc, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, this);
- src_discovery_cb_ = src_discovery_engine_.AddDiscoveryCB(test_topic_, discovered_at_src);
- src_discovery_engine_.Restart();
+ static void DiscoveredAtSink(const std::string &clientId, const std::string &status,
+ const void *msg, const int szmsg, SinkSrcStreamManagerTest *test)
+ {
+ DBG("DiscoveredAtSink");
+
+ auto sink_manager = static_cast<SinkStreamManager *>(test->sink_manager_.get());
+ if (!status.compare(aitt::AittDiscovery::WILL_LEAVE_NETWORK)) {
+ sink_manager->HandleRemovedClient(clientId);
+ return;
+ }
+
+ sink_manager->HandleMsg(clientId, std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
+ static_cast<const uint8_t *>(msg) + szmsg));
+ }
+
+ static void DiscoveredAtSrc(const std::string &clientId, const std::string &status,
+ const void *msg, const int szmsg, SinkSrcStreamManagerTest *test)
+ {
+ DBG("DiscoveredAtSrc");
+
+ auto src_manager = static_cast<SrcStreamManager *>(test->src_manager_.get());
+ if (!status.compare(aitt::AittDiscovery::WILL_LEAVE_NETWORK)) {
+ src_manager->HandleRemovedClient(clientId);
+ return;
+ }
+ src_manager->HandleMsg(clientId, std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
+ static_cast<const uint8_t *>(msg) + szmsg));
}
void TearDown() override
src_discovery_engine_.SetMQ(nullptr);
}
- sink_manager_->Stop();
- src_manager_->Stop();
-
g_main_loop_unref(mainLoop_);
}
void StartSinkStream()
{
+ auto discovered_at_sink = std::bind(DiscoveredAtSink, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, this);
+ sink_discovery_cb_ = sink_discovery_engine_.AddDiscoveryCB(
+ sink_manager_->GetWatchingTopic(), discovered_at_sink);
+ sink_discovery_engine_.Restart();
+
+ auto on_stream_stopped = std::bind(OnSinkStreamStopped, this);
+ sink_manager_->SetStreamStopCallback(on_stream_stopped);
+
+ auto on_ice_candidate = std::bind(OnSinkIceCandidate, std::placeholders::_1, this);
+ sink_manager_->SetIceCandidateAddedCallback(on_ice_candidate);
+
auto on_sink_stream_ready = std::bind(OnSinkStreamReady, std::placeholders::_1, this);
sink_manager_->SetStreamReadyCallback(on_sink_stream_ready);
sink_manager_->Start();
}
+ static void OnSinkStreamStopped(SinkSrcStreamManagerTest *test)
+ {
+ std::vector<uint8_t> msg;
+
+ flexbuffers::Builder fbb;
+ fbb.String("STOP");
+ fbb.Finish();
+
+ msg = fbb.GetBuffer();
+ test->sink_discovery_engine_.UpdateDiscoveryMsg(test->sink_manager_->GetTopic(), msg.data(),
+ msg.size());
+ }
+
+ static void OnSinkIceCandidate(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+ {
+ DBG("OnSinkIceCandidate");
+ auto discovery_message = test->sink_manager_->GetDiscoveryMessage();
+ test->sink_discovery_engine_.UpdateDiscoveryMsg(test->sink_manager_->GetTopic(),
+ discovery_message.data(), discovery_message.size());
+ }
+
+ static void OnSinkStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+ {
+ DBG("OnSinkStreamReady");
+
+ auto discovery_message = test->sink_manager_->GetDiscoveryMessage();
+ test->sink_discovery_engine_.UpdateDiscoveryMsg(test->sink_manager_->GetTopic(),
+ discovery_message.data(), discovery_message.size());
+ }
+
+ static void OnEncodedFrame(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+ {
+ static_cast<SinkStreamManager *>(test->sink_manager_.get())->SetOnEncodedFrameCallback(nullptr);
+
+ if (test->stop_sink_first_)
+ test->AddIdleStopSinkStream();
+ else
+ test->AddIdleStopSrcStream();
+ }
+
void StartSrcStream()
{
+ auto discovered_at_src = std::bind(DiscoveredAtSrc, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, this);
+ src_discovery_cb_ = src_discovery_engine_.AddDiscoveryCB(src_manager_->GetWatchingTopic(),
+ discovered_at_src);
+ src_discovery_engine_.Restart();
+
+ auto on_stream_started = std::bind(OnStreamStarted, this);
+ src_manager_->SetStreamStartCallback(on_stream_started);
+ auto on_stream_stopped = std::bind(OnSrcStreamStopped, this);
+ src_manager_->SetStreamStopCallback(on_stream_stopped);
+ auto on_ice_candidate = std::bind(OnSrcIceCandidate, std::placeholders::_1, this);
+ src_manager_->SetIceCandidateAddedCallback(on_ice_candidate);
+
auto on_src_stream_ready = std::bind(OnSrcStreamReady, std::placeholders::_1, this);
src_manager_->SetStreamReadyCallback(on_src_stream_ready);
src_manager_->Start();
}
- static void DiscoveredAtSink(const std::string &clientId, const std::string &status,
- const void *msg, const int szmsg, SinkSrcStreamManagerTest *test)
+ static void OnStreamStarted(SinkSrcStreamManagerTest *test)
{
- // TODO: Rearrange below
- DBG("DiscoveredAtSink");
- if (!clientId.compare(test->sink_manager_->GetClientId())) {
- DBG("but has same ID");
- return;
- }
+ DBG("Send Start");
+ std::vector<uint8_t> msg;
- auto sink_manager = static_cast<SinkStreamManager *>(test->sink_manager_.get());
- if (!status.compare(aitt::AittDiscovery::WILL_LEAVE_NETWORK)) {
- sink_manager->HandleRemovedClient(clientId);
- return;
- }
+ flexbuffers::Builder fbb;
+ fbb.String("START");
+ fbb.Finish();
- sink_manager->HandleDiscoveredStream(clientId,
- std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
- static_cast<const uint8_t *>(msg) + szmsg));
+ msg = fbb.GetBuffer();
+ test->src_discovery_engine_.UpdateDiscoveryMsg(test->src_manager_->GetTopic(), msg.data(),
+ msg.size());
}
- static void DiscoveredAtSrc(const std::string &clientId, const std::string &status,
- const void *msg, const int szmsg, SinkSrcStreamManagerTest *test)
+ static void OnSrcStreamStopped(SinkSrcStreamManagerTest *test)
{
- // TODO: Rearrange below
- DBG("DiscoveredAtSrc");
- if (!clientId.compare(test->src_manager_->GetClientId())) {
- DBG("but has same ID");
- return;
- }
-
- auto src_manager = static_cast<SrcStreamManager *>(test->src_manager_.get());
+ std::vector<uint8_t> msg;
- if (!status.compare(aitt::AittDiscovery::WILL_LEAVE_NETWORK)) {
- src_manager->HandleRemovedClient(clientId);
- return;
- }
+ flexbuffers::Builder fbb;
+ fbb.String("STOP");
+ fbb.Finish();
- src_manager->HandleDiscoveredStream(clientId,
- std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
- static_cast<const uint8_t *>(msg) + szmsg));
+ msg = fbb.GetBuffer();
+ test->src_discovery_engine_.UpdateDiscoveryMsg(test->src_manager_->GetTopic(), msg.data(),
+ msg.size());
}
- static void OnSinkStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+ static void OnSrcIceCandidate(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
{
- DBG("OnSinkStreamReady");
-
- auto discovery_message = WebRtcMessage::GenerateDiscoveryMessage(test->test_topic_, false,
- stream.GetLocalDescription(), stream.GetIceCandidates());
- test->sink_discovery_engine_.UpdateDiscoveryMsg(test->test_topic_, discovery_message.data(),
- discovery_message.size());
+ DBG("OnIceCandidateAdded");
+ auto discovery_message = test->src_manager_->GetDiscoveryMessage();
+ test->src_discovery_engine_.UpdateDiscoveryMsg(test->src_manager_->GetTopic(),
+ discovery_message.data(), discovery_message.size());
}
static void OnSrcStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
{
DBG("OnSrcStreamReady");
- auto discovery_message = WebRtcMessage::GenerateDiscoveryMessage(test->test_topic_, true,
- stream.GetLocalDescription(), stream.GetIceCandidates());
- test->src_discovery_engine_.UpdateDiscoveryMsg(test->test_topic_, discovery_message.data(),
- discovery_message.size());
- }
-
- static void OnEncodedFrame(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
- {
- if (test->stop_sink_first_)
- test->AddIdleStopSinkStream();
- else
- test->AddIdleStopSrcStream();
+ auto discovery_message = test->src_manager_->GetDiscoveryMessage();
+ test->src_discovery_engine_.UpdateDiscoveryMsg(test->src_manager_->GetTopic(),
+ discovery_message.data(), discovery_message.size());
}
void StartSinkFirst(void)
{
start_sink_id_ = g_timeout_add_seconds(1, GSourceStartSink, this);
- start_src_id_ = g_timeout_add_seconds(4, GSourceStartSrc, this);
+ start_src_id_ = g_timeout_add_seconds(2, GSourceStartSrc, this);
}
void StartSrcFirst(void)
return FALSE;
static_cast<SinkStreamManager *>(test->sink_manager_.get())->Stop();
- if (test->sink_discovery_engine_.HasValidMQ()) {
- test->sink_discovery_engine_.RemoveDiscoveryCB(test->sink_discovery_cb_);
- test->sink_discovery_engine_.Stop();
- test->sink_discovery_engine_.SetMQ(nullptr);
- }
return FALSE;
}
return FALSE;
static_cast<SrcStreamManager *>(test->src_manager_.get())->Stop();
- if (test->src_discovery_engine_.HasValidMQ()) {
- test->src_discovery_engine_.RemoveDiscoveryCB(test->src_discovery_cb_);
- test->src_discovery_engine_.Stop();
- test->src_discovery_engine_.SetMQ(nullptr);
- }
return FALSE;
}
AddStopEventLoop();
IterateEventLoop();
}
-
-class ModuleTest : public testing::Test {
- protected:
- ModuleTest()
- : test_topic_(std::string(WEBRTC_TOPIC_PREFIX) + std::string(TEST_TOPIC)),
- sink_discovery_engine_(std::string(TEST_SINK_CLIENT_ID)),
- src_discovery_engine_(std::string(TEST_SRC_CLIENT_ID)),
- sink_module_(nullptr),
- src_module_(nullptr),
- mainLoop_(nullptr){};
-
- void SetUp() override
- {
- // create g_main_loop & connect broker
- mainLoop_ = g_main_loop_new(nullptr, FALSE);
-
- sink_discovery_engine_.SetMQ(std::unique_ptr<aitt::MQ>(
- new aitt::MosquittoMQ(std::string(TEST_SINK_CLIENT_ID) + 'd', false)));
- sink_discovery_engine_.Start(LOCAL_IP, DEFAULT_BROKER_PORT, std::string(), std::string());
- sink_discovery_engine_.Restart();
- sink_module_ = std::unique_ptr<Module>(new Module(sink_discovery_engine_, test_topic_,
- AittStreamRole::AITT_STREAM_ROLE_SUBSCRIBER));
-
- src_discovery_engine_.SetMQ(std::unique_ptr<aitt::MQ>(
- new aitt::MosquittoMQ(std::string(TEST_SRC_CLIENT_ID) + 'd', false)));
- src_discovery_engine_.Start(LOCAL_IP, DEFAULT_BROKER_PORT, std::string(), std::string());
- src_discovery_engine_.Restart();
- src_module_ = std::unique_ptr<Module>(new Module(src_discovery_engine_, test_topic_,
- AittStreamRole::AITT_STREAM_ROLE_PUBLISHER));
- }
-
- void TearDown() override
- {
- // disconnect broker & destroy g_main_loop
-
- if (sink_discovery_engine_.HasValidMQ()) {
- sink_discovery_engine_.Stop();
- sink_discovery_engine_.SetMQ(nullptr);
- }
-
- if (src_discovery_engine_.HasValidMQ()) {
- src_discovery_engine_.Stop();
- src_discovery_engine_.SetMQ(nullptr);
- }
-
- g_main_loop_unref(mainLoop_);
- }
-
- void IterateEventLoop(void)
- {
- DBG("Go forward");
- g_main_loop_run(mainLoop_);
- }
-
- void AddStopEventLoop(void) { g_timeout_add_seconds(15, GSourceStopEventLoop, this); }
-
- static gboolean GSourceStopEventLoop(gpointer data)
- {
- DBG("GSourceStopEventLoop");
- auto test = static_cast<ModuleTest *>(data);
- if (!test)
- return FALSE;
-
- if (g_main_loop_is_running(test->mainLoop_))
- g_main_loop_quit(test->mainLoop_);
-
- return FALSE;
- }
-
- std::string test_topic_;
- aitt::AittDiscovery sink_discovery_engine_;
- aitt::AittDiscovery src_discovery_engine_;
- std::unique_ptr<Module> sink_module_;
- std::unique_ptr<Module> src_module_;
- GMainLoop *mainLoop_;
-};
-
-TEST_F(ModuleTest, test_OnDevice)
-{
- sink_module_->Start();
- src_module_->Start();
- AddStopEventLoop();
- IterateEventLoop();
-}