SinkStreamManager::~SinkStreamManager()
{
- for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
- itr->second->Destroy();
- streams_.clear();
}
void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb)
stream.GetEventHandler().SetOnIceCandidateCb(
std::bind(&SinkStreamManager::OnIceCandidate, this));
- stream.GetEventHandler().SetOnEncodedFrameCb(std::bind(&SinkStreamManager::OnEncodedFrame, this));
+ stream.GetEventHandler().SetOnEncodedFrameCb(
+ std::bind(&SinkStreamManager::OnEncodedFrame, this));
}
void SinkStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream)
else if (src_state.compare("STOP") == 0)
HandleRemovedClient(discovery_id);
else
- DBG("Invalid message %s", src_state);
+ ERR("Invalid message %s", src_state);
}
void SinkStreamManager::HandleStartStream(const std::string &discovery_id)
{
DBG("%s", __func__);
- auto src_stream = streams_.find(discovery_id);
- if (src_stream != streams_.end()) {
- DBG("There's stream already");
+ if (!peer_aitt_id_.empty()) {
+ ERR("There's stream already");
return;
}
void SinkStreamManager::AddStream(const std::string &discovery_id)
{
- auto stream = new WebRtcStream();
- SetWebRtcStreamCallbacks(*stream);
- stream->Create(false, false);
- stream->Start();
+ SetWebRtcStreamCallbacks(stream_);
+ stream_.Create(false, false);
+ stream_.Start();
std::stringstream s_stream;
- s_stream << static_cast<void *>(stream);
+ s_stream << static_cast<void *>(&stream_);
- stream->SetStreamId(std::string(thread_id_ + s_stream.str()));
- streams_[discovery_id] = stream;
+ stream_.SetStreamId(std::string(thread_id_ + s_stream.str()));
+ peer_aitt_id_ = discovery_id;
}
void SinkStreamManager::HandleStreamInfo(const std::string &discovery_id,
const std::vector<uint8_t> &message)
{
if (!WebRtcMessage::IsValidStreamInfo(message)) {
- DBG("Invalid streams info");
+ ERR("Invalid streams info");
return;
}
- // sink_streams have a stream at normal situation
- auto src_streams = flexbuffers::GetRoot(message).AsVector();
- for (size_t stream_idx = 0; stream_idx < src_streams.size(); ++stream_idx) {
- auto stream = src_streams[stream_idx].AsMap();
- auto id = stream["id"].AsString().str();
- auto peer_id = stream["peer_id"].AsString().str();
- auto sdp = stream["sdp"].AsString().str();
- std::vector<std::string> ice_candidates;
- auto ice_info = stream["ice_candidates"].AsVector();
- for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
- ice_candidates.push_back(ice_info[ice_idx].AsString().str());
- UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
- }
+ // have a stream at Current status
+ auto stream = flexbuffers::GetRoot(message).AsMap();
+ auto id = stream["id"].AsString().str();
+ auto peer_id = stream["peer_id"].AsString().str();
+ auto sdp = stream["sdp"].AsString().str();
+ std::vector<std::string> ice_candidates;
+ auto ice_info = stream["ice_candidates"].AsVector();
+ for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
+ ice_candidates.push_back(ice_info[ice_idx].AsString().str());
+ UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
}
void SinkStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
const std::string &peer_id, const std::string &sdp,
const std::vector<std::string> &ice_candidates)
{
- auto src_stream = streams_.find(discovery_id);
- if (src_stream == streams_.end()) {
- DBG("No matching stream");
+ //There's only one stream for a aitt ID
+ if (peer_aitt_id_ != discovery_id) {
+ ERR("No matching stream");
return;
}
- if (peer_id.compare(src_stream->second->GetStreamId()) != 0) {
- DBG("Different ID");
+ if (peer_id.compare(stream_.GetStreamId()) != 0) {
+ ERR("Different ID");
return;
}
- if (src_stream->second->GetPeerId().size() == 0) {
+ if (stream_.GetPeerId().size() == 0) {
DBG("first update");
- src_stream->second->SetPeerId(id);
- src_stream->second->AddPeerInformation(sdp, ice_candidates);
+ stream_.SetPeerId(id);
+ stream_.AddPeerInformation(sdp, ice_candidates);
} else
- src_stream->second->UpdatePeerInformation(ice_candidates);
+ stream_.UpdatePeerInformation(ice_candidates);
}
std::vector<uint8_t> SinkStreamManager::GetDiscoveryMessage(void)
std::vector<uint8_t> message;
flexbuffers::Builder fbb;
- fbb.Vector([&] {
- for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) {
- fbb.Map([&] {
- fbb.String("id", itr->second->GetStreamId());
- fbb.String("peer_id", itr->second->GetPeerId());
- fbb.String("sdp", itr->second->GetLocalDescription());
- fbb.Vector("ice_candidates", [&]() {
- for (const auto &candidate : itr->second->GetIceCandidates()) {
- fbb.String(candidate);
- }
- });
- });
- }
+ fbb.Map([&] {
+ fbb.String("id", stream_.GetStreamId());
+ fbb.String("peer_id", stream_.GetPeerId());
+ fbb.String("sdp", stream_.GetLocalDescription());
+ fbb.Vector("ice_candidates", [&]() {
+ for (const auto &candidate : stream_.GetIceCandidates()) {
+ fbb.String(candidate);
+ }
+ });
});
fbb.Finish();
SrcStreamManager::~SrcStreamManager()
{
- // TODO: You should take care about stream resource
- for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
- itr->second->Destroy();
- streams_.clear();
}
void SrcStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
if (sink_state.compare("STOP") == 0)
HandleRemovedClient(discovery_id);
else
- DBG("Invalid message %s", sink_state);
+ ERR("Invalid message %s", sink_state);
}
void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id,
{
DBG("%s", __func__);
if (!WebRtcMessage::IsValidStreamInfo(message)) {
- DBG("Invalid streams info");
+ ERR("Invalid streams info");
return;
}
- // sink_streams have a stream at normal situation
- auto sink_streams = flexbuffers::GetRoot(message).AsVector();
- for (size_t stream_idx = 0; stream_idx < sink_streams.size(); ++stream_idx) {
- auto stream = sink_streams[stream_idx].AsMap();
- auto id = stream["id"].AsString().str();
- auto peer_id = stream["peer_id"].AsString().str();
- auto sdp = stream["sdp"].AsString().str();
- std::vector<std::string> ice_candidates;
- auto ice_info = stream["ice_candidates"].AsVector();
- for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
- ice_candidates.push_back(ice_info[ice_idx].AsString().str());
- UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
- }
+ // have a stream at Current status
+ auto stream = flexbuffers::GetRoot(message).AsMap();
+ auto id = stream["id"].AsString().str();
+ auto peer_id = stream["peer_id"].AsString().str();
+ auto sdp = stream["sdp"].AsString().str();
+ std::vector<std::string> ice_candidates;
+ auto ice_info = stream["ice_candidates"].AsVector();
+ for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
+ ice_candidates.push_back(ice_info[ice_idx].AsString().str());
+ UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
}
void SrcStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
const std::string &peer_id, const std::string &sdp,
const std::vector<std::string> &ice_candidates)
{
- auto sink_stream = streams_.find(discovery_id);
- if (sink_stream == streams_.end())
+ // There's only one stream for a aitt ID
+ if (peer_aitt_id_.empty())
AddStream(discovery_id, id, sdp, ice_candidates);
+ else if (peer_aitt_id_ == discovery_id && peer_id.compare(stream_.GetStreamId()) != 0)
+ stream_.UpdatePeerInformation(ice_candidates);
else
- sink_stream->second->UpdatePeerInformation(ice_candidates);
+ ERR("Invalid peer ID");
}
void SrcStreamManager::AddStream(const std::string &discovery_id, const std::string &id,
const std::string &sdp, const std::vector<std::string> &ice_candidates)
{
- auto stream = new WebRtcStream();
- SetWebRtcStreamCallbacks(*stream);
- stream->Create(true, false);
- stream->AttachCameraSource();
- stream->Start();
+ SetWebRtcStreamCallbacks(stream_);
+ stream_.Create(true, false);
+ stream_.AttachCameraSource();
+ stream_.Start();
std::stringstream s_stream;
- s_stream << static_cast<void *>(stream);
-
- stream->SetStreamId(std::string(thread_id_ + s_stream.str()));
- stream->SetPeerId(id);
- stream->AddPeerInformation(sdp, ice_candidates);
+ s_stream << static_cast<void *>(&stream_);
- streams_[discovery_id] = stream;
+ stream_.SetStreamId(std::string(thread_id_ + s_stream.str()));
+ stream_.SetPeerId(id);
+ peer_aitt_id_ = discovery_id;
+ stream_.AddPeerInformation(sdp, ice_candidates);
return;
}
std::vector<uint8_t> message;
flexbuffers::Builder fbb;
- fbb.Vector([&] {
- for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) {
- fbb.Map([&] {
- fbb.String("id", itr->second->GetStreamId());
- fbb.String("peer_id", itr->second->GetPeerId());
- fbb.String("sdp", itr->second->GetLocalDescription());
- fbb.Vector("ice_candidates", [&]() {
- for (const auto &candidate : itr->second->GetIceCandidates()) {
- fbb.String(candidate);
- }
- });
- });
- }
+ fbb.Map([&] {
+ fbb.String("id", stream_.GetStreamId());
+ fbb.String("peer_id", stream_.GetPeerId());
+ fbb.String("sdp", stream_.GetLocalDescription());
+ fbb.Vector("ice_candidates", [&]() {
+ for (const auto &candidate : stream_.GetIceCandidates()) {
+ fbb.String(candidate);
+ }
+ });
});
fbb.Finish();
void StreamManager::Stop(void)
{
DBG("%s %s", __func__, GetTopic().c_str());
- // TODO: You should take care about stream resource
- for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
- itr->second->Destroy();
- streams_.clear();
+ peer_aitt_id_.clear();
+ stream_.Destroy();
if (stream_stop_cb_)
stream_stop_cb_();
void StreamManager::HandleRemovedClient(const std::string &discovery_id)
{
- auto stream_itr = streams_.find(discovery_id);
- if (stream_itr == streams_.end()) {
- DBG("There's no stream %s", discovery_id.c_str());
+ if (peer_aitt_id_ != discovery_id){
+ ERR("There's no stream %s", discovery_id.c_str());
return;
}
- // TODO: You should take care about stream resource
- stream_itr->second->Destroy();
- streams_.erase(stream_itr);
+ stream_.Destroy();
return;
}
#include <string>
#include <thread>
#include <vector>
+#include <memory>
namespace AittWebRTCNamespace {
// TODO: why dont' we remove below
std::string aitt_id_;
std::string thread_id_;
- // TODO: What if user copies the module?
- // Think about that case with destructor
- std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> streams_;
+ //We assume Module class can't be copyable
+ std::string peer_aitt_id_;
+ WebRtcStream stream_;
StreamStartCallback stream_start_cb_;
StreamStopCallback stream_stop_cb_;
IceCandidateAddedCallback ice_candidate_added_cb_;
bool WebRtcMessage::IsValidStreamInfo(const std::vector<uint8_t> &message)
{
- if (!flexbuffers::GetRoot(message).IsVector())
+ if (!flexbuffers::GetRoot(message).IsMap())
return false;
- auto streams = flexbuffers::GetRoot(message).AsVector();
- for (size_t idx = 0; idx < streams.size(); ++idx) {
- auto stream = streams[idx].AsMap();
- if (!IsValidStream(stream))
- return false;
- }
+ auto stream_info = flexbuffers::GetRoot(message).AsMap();
- return true;
+ return IsValidStream(stream_info);
}
bool WebRtcMessage::IsValidStream(const flexbuffers::Map &info)