LINK_DIRECTORIES(${AITT_WEBRTC_NEEDS_LIBRARY_DIRS})
ADD_LIBRARY(WEBRTC_OBJ OBJECT
+ StreamManager.cc
SrcStreamManager.cc
SinkStreamManager.cc
WebRtcMessage.cc
namespace AittWebRTCNamespace {
Module::Module(AittDiscovery &discovery, const std::string &topic, AittStreamRole role)
- : is_source_(IsSource(role)), discovery_(discovery), state_cb_user_data_(nullptr), receive_cb_user_data_(nullptr)
+ : is_source_(role == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER),
+ discovery_(discovery),
+ state_cb_user_data_(nullptr),
+ receive_cb_user_data_(nullptr)
{
std::stringstream s_stream;
s_stream << std::this_thread::get_id();
{
}
-
void Module::Start(void)
{
- auto on_ice_candidate_added =
- std::bind(&Module::OnIceCandidateAdded, this, std::placeholders::_1);
- stream_manager_->SetIceCandidateAddedCallback(on_ice_candidate_added);
-
- auto on_stream_ready = std::bind(&Module::OnStreamReady, this, std::placeholders::_1);
- stream_manager_->SetStreamReadyCallback(on_stream_ready);
-
- auto on_stream_started = std::bind(&Module::OnStreamStarted, this);
- stream_manager_->SetStreamStartCallback(on_stream_started);
-
- auto on_stream_stopped = std::bind(&Module::OnStreamStopped, this);
- stream_manager_->SetStreamStopCallback(on_stream_stopped);
+ stream_manager_->SetIceCandidateAddedCallback(std::bind(&Module::OnIceCandidateAdded, this));
+ stream_manager_->SetStreamStartCallback(std::bind(&Module::OnStreamStarted, this));
+ stream_manager_->SetStreamStopCallback(std::bind(&Module::OnStreamStopped, this));
stream_manager_->Start();
}
{
}
-void Module::OnIceCandidateAdded(WebRtcStream &stream)
+void Module::OnIceCandidateAdded(void)
{
DBG("OnIceCandidateAdded");
auto msg = stream_manager_->GetDiscoveryMessage();
discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
}
-void Module::OnStreamReady(WebRtcStream &stream)
-{
- DBG("OnStreamReady");
-
- auto msg = stream_manager_->GetDiscoveryMessage();
- discovery_.UpdateDiscoveryMsg(stream_manager_->GetTopic(), msg.data(), msg.size());
-}
-
void Module::OnStreamStarted(void)
{
std::vector<uint8_t> msg;
receive_callback_ = cb;
receive_cb_user_data_ = user_data;
}
-bool Module::IsSource(AittStreamRole role)
-{
- return role == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER;
-}
void Module::DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
const void *msg, const int szmsg)
return;
}
- stream_manager_->HandleMsg(clientId,
- std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
- static_cast<const uint8_t *>(msg) + szmsg));
+ stream_manager_->HandleMsg(clientId, std::vector<uint8_t>(static_cast<const uint8_t *>(msg),
+ static_cast<const uint8_t *>(msg) + szmsg));
}
} // namespace AittWebRTCNamespace
#include <AittDiscovery.h>
#include <AittStreamModule.h>
-#include "StreamManager.h"
#include <map>
#include <memory>
#include <mutex>
#include <string>
+#include "StreamManager.h"
+
using AittDiscovery = aitt::AittDiscovery;
using AittStreamModule = aitt::AittStreamModule;
void Stop(void) override;
void SetStateCallback(StateCallback cb, void *user_data) override;
void SetReceiveCallback(ReceiveCallback cb, void *user_data) override;
- static bool IsSource(AittStreamRole role);
private:
- //TODO: Update Ice Candidates when stream add candidates.
- void OnIceCandidateAdded(WebRtcStream &stream);
- void OnStreamReady(WebRtcStream &stream);
+ // TODO: Update Ice Candidates when stream add candidates.
+ void OnIceCandidateAdded(void);
void OnStreamStarted(void);
void OnStreamStopped(void);
void DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
AittDiscovery &discovery_;
int discovery_cb_;
- //TODO: What if user copies the module?
- //Think about that case with destructor
+ // TODO: What if user copies the module?
+ // Think about that case with destructor
StreamManager *stream_manager_;
StateCallback state_callback_;
void *state_cb_user_data_;
namespace AittWebRTCNamespace {
SinkStreamManager::SinkStreamManager(const std::string &topic, const std::string &aitt_id,
const std::string &thread_id)
- : StreamManager(topic + "/SINK", aitt_id, thread_id), watching_topic_(topic + "/SRC")
+ : StreamManager(topic + "/SINK", topic + "/SRC", aitt_id, thread_id)
{
}
SinkStreamManager::~SinkStreamManager()
{
- for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr)
+ for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
itr->second->Destroy();
- src_stream_.clear();
-}
-
-void SinkStreamManager::Start()
-{
- DBG("%s %s", __func__, GetTopic().c_str());
- if (stream_start_cb_)
- stream_start_cb_();
-}
-
-void SinkStreamManager::Stop()
-{
- DBG("%s %s", __func__, GetTopic().c_str());
- // TODO: You should take care about stream resource
- for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr)
- itr->second->Destroy();
- src_stream_.clear();
-
- if (stream_stop_cb_)
- stream_stop_cb_();
-}
-
-void SinkStreamManager::OnEncodedFrame(WebRtcStream &stream)
-{
- if (encoded_frame_cb_)
- encoded_frame_cb_(stream);
-}
-
-void SinkStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb)
-{
- ice_candidate_added_cb_ = cb;
-}
-
-void SinkStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
-{
- stream_ready_cb_ = cb;
-}
-
-void SinkStreamManager::SetStreamStartCallback(StreamStartCallback cb)
-{
- stream_start_cb_ = cb;
-}
-
-void SinkStreamManager::SetStreamStopCallback(StreamStopCallback cb)
-{
- stream_stop_cb_ = cb;
+ streams_.clear();
}
void SinkStreamManager::SetOnEncodedFrameCallback(EncodedFrameCallabck cb)
void SinkStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
{
- auto on_stream_state_changed_cb = std::bind(&SinkStreamManager::OnStreamStateChanged, this,
- std::placeholders::_1, std::ref(stream));
- stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
-
- auto on_ice_candidate_added_cb = std::bind(&SinkStreamManager::OnIceCandidate, this,
- std::placeholders::_1, std::ref(stream));
- stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb);
+ stream.GetEventHandler().SetOnStateChangedCb(std::bind(&SinkStreamManager::OnStreamStateChanged,
+ this, std::placeholders::_1, std::ref(stream)));
- auto on_ice_gathering_state_changed_cb =
- std::bind(&SinkStreamManager::OnIceGatheringStateNotify, this, std::placeholders::_1,
- std::ref(stream));
- stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
+ stream.GetEventHandler().SetOnIceCandidateCb(
+ std::bind(&SinkStreamManager::OnIceCandidate, this));
- auto on_encoded_frame_cb =
- std::bind(&SinkStreamManager::OnEncodedFrame, this, std::ref(stream));
- stream.GetEventHandler().SetOnEncodedFrameCb(on_encoded_frame_cb);
+ stream.GetEventHandler().SetOnEncodedFrameCb(std::bind(&SinkStreamManager::OnEncodedFrame, this));
}
void SinkStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream)
{
DBG("OnSinkStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
if (state == WebRtcState::Stream::NEGOTIATING) {
- auto on_offer_created = std::bind(&SinkStreamManager::OnOfferCreated, this,
- std::placeholders::_1, std::ref(stream));
- stream.CreateOfferAsync(on_offer_created);
+ stream.CreateOfferAsync(std::bind(&SinkStreamManager::OnOfferCreated, this,
+ std::placeholders::_1, std::ref(stream)));
}
}
stream.SetLocalDescription(sdp);
}
-void SinkStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream)
+void SinkStreamManager::OnIceCandidate(void)
{
if (ice_candidate_added_cb_)
- ice_candidate_added_cb_(stream);
-}
-
-void SinkStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state,
- WebRtcStream &stream)
-{
- DBG("Sink IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
- if (state == WebRtcState::IceGathering::COMPLETE) {
- if (stream_ready_cb_)
- stream_ready_cb_(stream);
- }
-}
-
-void SinkStreamManager::HandleRemovedClient(const std::string &discovery_id)
-{
- auto src_stream_itr = src_stream_.find(discovery_id);
- if (src_stream_itr == src_stream_.end()) {
- DBG("There's no source stream %s", discovery_id.c_str());
- return;
- }
-
- // TODO: You should take care about stream resource
- src_stream_itr->second->Destroy();
- src_stream_.erase(src_stream_itr);
-
- return;
+ ice_candidate_added_cb_();
}
-void SinkStreamManager::HandleMsg(const std::string &discovery_id,
- const std::vector<uint8_t> &message)
+void SinkStreamManager::OnEncodedFrame(void)
{
- if (flexbuffers::GetRoot(message).IsString())
- HandleStreamState(discovery_id, message);
-
- else if (flexbuffers::GetRoot(message).IsVector())
- HandleStreamInfo(discovery_id, message);
+ if (encoded_frame_cb_)
+ encoded_frame_cb_();
}
void SinkStreamManager::HandleStreamState(const std::string &discovery_id,
const std::vector<uint8_t> &message)
{
+ DBG("%s", __func__);
auto src_state = flexbuffers::GetRoot(message).ToString();
if (src_state.compare("START") == 0)
void SinkStreamManager::HandleStartStream(const std::string &discovery_id)
{
- auto src_stream = src_stream_.find(discovery_id);
- if (src_stream != src_stream_.end()) {
+ DBG("%s", __func__);
+ auto src_stream = streams_.find(discovery_id);
+ if (src_stream != streams_.end()) {
DBG("There's stream already");
return;
}
s_stream << static_cast<void *>(stream);
stream->SetStreamId(std::string(thread_id_ + s_stream.str()));
- src_stream_[discovery_id] = stream;
+ streams_[discovery_id] = stream;
}
void SinkStreamManager::HandleStreamInfo(const std::string &discovery_id,
const std::string &peer_id, const std::string &sdp,
const std::vector<std::string> &ice_candidates)
{
- auto src_stream = src_stream_.find(discovery_id);
- if (src_stream == src_stream_.end()) {
+ auto src_stream = streams_.find(discovery_id);
+ if (src_stream == streams_.end()) {
DBG("No matching stream");
return;
}
src_stream->second->AddPeerInformation(sdp, ice_candidates);
} else
src_stream->second->UpdatePeerInformation(ice_candidates);
-
}
std::vector<uint8_t> SinkStreamManager::GetDiscoveryMessage(void)
flexbuffers::Builder fbb;
fbb.Vector([&] {
- for (auto itr = src_stream_.begin(); itr != src_stream_.end(); ++itr) {
+ for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) {
fbb.Map([&] {
fbb.String("id", itr->second->GetStreamId());
fbb.String("peer_id", itr->second->GetPeerId());
return message;
}
-std::string SinkStreamManager::GetWatchingTopic(void)
-{
- return watching_topic_;
-}
-
} // namespace AittWebRTCNamespace
class SinkStreamManager : public StreamManager {
public:
- using EncodedFrameCallabck = std::function<void(WebRtcStream &stream)>;
+ using EncodedFrameCallabck = std::function<void(void)>;
explicit SinkStreamManager(const std::string &topic, const std::string &aitt_id,
const std::string &thread_id);
virtual ~SinkStreamManager();
- void Start(void) override;
- void Stop(void) override;
- void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) override;
- void SetStreamReadyCallback(StreamReadyCallback cb) override;
- void SetStreamStartCallback(StreamStartCallback cb) override;
- void SetStreamStopCallback(StreamStopCallback cb) override;
std::vector<uint8_t> GetDiscoveryMessage(void) override;
- std::string GetWatchingTopic(void) override;
// TODO: WebRTC CAPI doesn't allow destroy webrtc handle at callback.
// We need to avoid that situation
void SetOnEncodedFrameCallback(EncodedFrameCallabck cb);
- void HandleRemovedClient(const std::string &discovery_id) override;
- void HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message) override;
private:
void SetWebRtcStreamCallbacks(WebRtcStream &stream) override;
void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
void OnOfferCreated(std::string sdp, WebRtcStream &stream);
- void OnIceCandidate(const std::string &candidate, WebRtcStream &stream);
- void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream);
- void OnEncodedFrame(WebRtcStream &stream);
- void HandleStreamState(const std::string &discovery_id, const std::vector<uint8_t> &message);
+ void OnIceCandidate(void);
+ void OnEncodedFrame(void);
+ void HandleStreamState(const std::string &discovery_id,
+ const std::vector<uint8_t> &message) override;
void HandleStartStream(const std::string &discovery_id);
- void HandleStreamInfo(const std::string &discovery_id, const std::vector<uint8_t> &message);
+ void HandleStreamInfo(const std::string &discovery_id,
+ const std::vector<uint8_t> &message) override;
void AddStream(const std::string &discovery_id);
void UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
const std::string &peer_id, const std::string &sdp,
const std::vector<std::string> &ice_candidates);
- std::string watching_topic_;
- // TODO: What if user copies the module?
- // Think about that case with destructor
- std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> src_stream_;
- IceCandidateAddedCallback ice_candidate_added_cb_;
- StreamReadyCallback stream_ready_cb_;
- StreamStartCallback stream_start_cb_;
- StreamStopCallback stream_stop_cb_;
EncodedFrameCallabck encoded_frame_cb_;
};
} // namespace AittWebRTCNamespace
SrcStreamManager::SrcStreamManager(const std::string &topic, const std::string &aitt_id,
const std::string &thread_id)
- : StreamManager(topic + "/SRC", aitt_id, thread_id), watching_topic_(topic + "/SINK")
+ : StreamManager(topic + "/SRC", topic + "/SINK", aitt_id, thread_id)
{
}
SrcStreamManager::~SrcStreamManager()
{
// TODO: You should take care about stream resource
- for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr)
+ for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
itr->second->Destroy();
- sink_streams_.clear();
-}
-
-void SrcStreamManager::Start(void)
-{
- DBG("%s %s", __func__, GetTopic().c_str());
- if (stream_start_cb_)
- stream_start_cb_();
-}
-
-void SrcStreamManager::Stop(void)
-{
- DBG("%s %s", __func__, GetTopic().c_str());
- // TODO: You should take care about stream resource
- for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr)
- itr->second->Destroy();
- sink_streams_.clear();
-
- if (stream_stop_cb_)
- stream_stop_cb_();
-}
-
-void SrcStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb)
-{
- ice_candidate_added_cb_ = cb;
-}
-
-void SrcStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
-{
- stream_ready_cb_ = cb;
-}
-
-void SrcStreamManager::SetStreamStartCallback(StreamStartCallback cb)
-{
- stream_start_cb_ = cb;
-}
-
-void SrcStreamManager::SetStreamStopCallback(StreamStopCallback cb)
-{
- stream_stop_cb_ = cb;
+ streams_.clear();
}
void SrcStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
{
- auto on_stream_state_changed_cb = std::bind(&SrcStreamManager::OnStreamStateChanged, this,
- std::placeholders::_1, std::ref(stream));
- stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
-
- auto on_ice_candidate_added_cb = std::bind(&SrcStreamManager::OnIceCandidate, this,
- std::placeholders::_1, std::ref(stream));
- stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb);
+ stream.GetEventHandler().SetOnStateChangedCb(
+ std::bind(&SrcStreamManager::OnStreamStateChanged, this, std::placeholders::_1));
- auto on_signaling_state_notify_cb = std::bind(&SrcStreamManager::OnSignalingStateNotify, this,
- std::placeholders::_1, std::ref(stream));
- stream.GetEventHandler().SetOnSignalingStateNotifyCb(on_signaling_state_notify_cb);
+ stream.GetEventHandler().SetOnSignalingStateNotifyCb(
+ std::bind(&SrcStreamManager::OnSignalingStateNotify, this, std::placeholders::_1,
+ std::ref(stream)));
- auto on_ice_gathering_state_changed_cb = std::bind(&SrcStreamManager::OnIceGatheringStateNotify,
- this, std::placeholders::_1, std::ref(stream));
- stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
+ stream.GetEventHandler().SetOnIceCandidateCb(
+ std::bind(&SrcStreamManager::OnIceCandidate, this));
}
-void SrcStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream)
+void SrcStreamManager::OnStreamStateChanged(WebRtcState::Stream state)
{
DBG("OnSrcStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
}
{
DBG("OnSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
- auto on_answer_created = std::bind(&SrcStreamManager::OnAnswerCreated, this,
- std::placeholders::_1, std::ref(stream));
- stream.CreateAnswerAsync(on_answer_created);
+ stream.CreateAnswerAsync(std::bind(&SrcStreamManager::OnAnswerCreated, this,
+ std::placeholders::_1, std::ref(stream)));
}
}
stream.SetLocalDescription(sdp);
}
-void SrcStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream)
+void SrcStreamManager::OnIceCandidate(void)
{
if (ice_candidate_added_cb_)
- ice_candidate_added_cb_(stream);
-}
-
-void SrcStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state,
- WebRtcStream &stream)
-{
- DBG("Src IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
- if (state == WebRtcState::IceGathering::COMPLETE) {
- if (stream_ready_cb_)
- stream_ready_cb_(stream);
- }
-}
-
-void SrcStreamManager::HandleRemovedClient(const std::string &discovery_id)
-{
- auto sink_stream_itr = sink_streams_.find(discovery_id);
- if (sink_stream_itr == sink_streams_.end()) {
- DBG("There's no sink stream %s", discovery_id.c_str());
- return;
- }
-
- // TODO: You should take care about stream resource
- sink_stream_itr->second->Destroy();
- sink_streams_.erase(sink_stream_itr);
-
- return;
-}
-
-void SrcStreamManager::HandleMsg(const std::string &discovery_id,
- const std::vector<uint8_t> &message)
-{
- if (flexbuffers::GetRoot(message).IsString())
- HandleStreamState(discovery_id, message);
- else if (flexbuffers::GetRoot(message).IsVector())
- HandleStreamInfo(discovery_id, message);
+ ice_candidate_added_cb_();
}
void SrcStreamManager::HandleStreamState(const std::string &discovery_id,
const std::vector<uint8_t> &message)
{
+ DBG("%s", __func__);
auto sink_state = flexbuffers::GetRoot(message).ToString();
if (sink_state.compare("STOP") == 0)
void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id,
const std::vector<uint8_t> &message)
{
+ DBG("%s", __func__);
if (!WebRtcMessage::IsValidStreamInfo(message)) {
DBG("Invalid streams info");
return;
const std::string &peer_id, const std::string &sdp,
const std::vector<std::string> &ice_candidates)
{
- auto sink_stream = sink_streams_.find(discovery_id);
- if (sink_stream == sink_streams_.end())
+ auto sink_stream = streams_.find(discovery_id);
+ if (sink_stream == streams_.end())
AddStream(discovery_id, id, sdp, ice_candidates);
else
sink_stream->second->UpdatePeerInformation(ice_candidates);
stream->SetPeerId(id);
stream->AddPeerInformation(sdp, ice_candidates);
- sink_streams_[discovery_id] = stream;
+ streams_[discovery_id] = stream;
return;
}
flexbuffers::Builder fbb;
fbb.Vector([&] {
- for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr) {
+ for (auto itr = streams_.begin(); itr != streams_.end(); ++itr) {
fbb.Map([&] {
fbb.String("id", itr->second->GetStreamId());
fbb.String("peer_id", itr->second->GetPeerId());
return message;
}
-std::string SrcStreamManager::GetWatchingTopic(void)
-{
- return watching_topic_;
-}
-
} // namespace AittWebRTCNamespace
explicit SrcStreamManager(const std::string &topic, const std::string &aitt_id,
const std::string &thread_id);
virtual ~SrcStreamManager();
- void Start(void) override;
- // TODO: What's the best way to shutdown all?
- void Stop(void) override;
- void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) override;
- void SetStreamReadyCallback(StreamReadyCallback cb) override;
- void SetStreamStartCallback(StreamStartCallback cb) override;
- void SetStreamStopCallback(StreamStopCallback cb) override;
std::vector<uint8_t> GetDiscoveryMessage(void) override;
- std::string GetWatchingTopic(void) override;
- void HandleRemovedClient(const std::string &discovery_id) override;
- void HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message) override;
private:
void SetWebRtcStreamCallbacks(WebRtcStream &stream) override;
- void OnStreamStateChanged(WebRtcState::Stream state, WebRtcStream &stream);
+ void OnStreamStateChanged(WebRtcState::Stream state);
void OnAnswerCreated(std::string sdp, WebRtcStream &stream);
- void OnIceCandidate(const std::string &candidate, WebRtcStream &stream);
+ void OnIceCandidate(void);
void OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream);
- void OnIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream);
- void HandleStreamState(const std::string &discovery_id, const std::vector<uint8_t> &message);
- void HandleStreamInfo(const std::string &discovery_id, const std::vector<uint8_t> &message);
+ void HandleStreamState(const std::string &discovery_id,
+ const std::vector<uint8_t> &message) override;
+ void HandleStreamInfo(const std::string &discovery_id,
+ const std::vector<uint8_t> &message) override;
void AddStream(const std::string &discovery_id, const std::string &id, const std::string &sdp,
const std::vector<std::string> &ice_candidates);
void UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
const std::string &peer_id, const std::string &sdp,
const std::vector<std::string> &ice_candidates);
-
- std::string watching_topic_;
- // TODO: What if user copies the module?
- // Think about that case with destructor
- std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> sink_streams_;
- IceCandidateAddedCallback ice_candidate_added_cb_;
- StreamReadyCallback stream_ready_cb_;
- StreamStartCallback stream_start_cb_;
- StreamStopCallback stream_stop_cb_;
};
} // namespace AittWebRTCNamespace
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StreamManager.h"
+
+#include "aitt_internal.h"
+
+namespace AittWebRTCNamespace {
+
+StreamManager::StreamManager(const std::string &topic, const std::string &watching_topic,
+ const std::string &aitt_id, const std::string &thread_id)
+ : topic_(topic), watching_topic_(watching_topic), aitt_id_(aitt_id), thread_id_(thread_id)
+{
+}
+
+void StreamManager::Start(void)
+{
+ DBG("%s %s", __func__, GetTopic().c_str());
+ if (stream_start_cb_)
+ stream_start_cb_();
+}
+
+void StreamManager::Stop(void)
+{
+ DBG("%s %s", __func__, GetTopic().c_str());
+ // TODO: You should take care about stream resource
+ for (auto itr = streams_.begin(); itr != streams_.end(); ++itr)
+ itr->second->Destroy();
+ streams_.clear();
+
+ if (stream_stop_cb_)
+ stream_stop_cb_();
+}
+
+void StreamManager::HandleRemovedClient(const std::string &discovery_id)
+{
+ auto stream_itr = streams_.find(discovery_id);
+ if (stream_itr == streams_.end()) {
+ DBG("There's no stream %s", discovery_id.c_str());
+ return;
+ }
+
+ // TODO: You should take care about stream resource
+ stream_itr->second->Destroy();
+ streams_.erase(stream_itr);
+
+ return;
+}
+
+void StreamManager::HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message)
+{
+ if (flexbuffers::GetRoot(message).IsString())
+ HandleStreamState(discovery_id, message);
+ else if (flexbuffers::GetRoot(message).IsVector())
+ HandleStreamInfo(discovery_id, message);
+}
+
+std::string StreamManager::GetTopic(void) const
+{
+ return topic_;
+}
+
+std::string StreamManager::GetWatchingTopic(void) const
+{
+ return watching_topic_;
+}
+
+void StreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb)
+{
+ ice_candidate_added_cb_ = cb;
+}
+
+void StreamManager::SetStreamStartCallback(StreamStartCallback cb)
+{
+ stream_start_cb_ = cb;
+}
+
+void StreamManager::SetStreamStopCallback(StreamStopCallback cb)
+{
+ stream_stop_cb_ = cb;
+}
+
+} // namespace AittWebRTCNamespace
class StreamManager {
public:
- using IceCandidateAddedCallback = std::function<void(WebRtcStream &stream)>;
- using StreamReadyCallback = std::function<void(WebRtcStream &stream)>;
+ using IceCandidateAddedCallback = std::function<void(void)>;
using StreamStartCallback = std::function<void(void)>;
using StreamStopCallback = std::function<void(void)>;
- explicit StreamManager(const std::string &topic, const std::string &aitt_id,
- const std::string &thread_id)
- : topic_(topic), aitt_id_(aitt_id), thread_id_(thread_id){};
+ explicit StreamManager(const std::string &topic, const std::string &watching_topic,
+ const std::string &aitt_id, const std::string &thread_id);
virtual ~StreamManager() = default;
- std::string GetTopic(void) const { return topic_; };
- std::string GetClientId(void) const { return aitt_id_; };
- virtual void HandleRemovedClient(const std::string &discovery_id) = 0;
- virtual void HandleMsg(const std::string &discovery_id,
- const std::vector<uint8_t> &message) = 0;
-
- virtual void Start(void) = 0;
- virtual void Stop(void) = 0;
- virtual void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) = 0;
- virtual void SetStreamReadyCallback(StreamReadyCallback cb) = 0;
- virtual void SetStreamStartCallback(StreamStartCallback cb) = 0;
- virtual void SetStreamStopCallback(StreamStopCallback cb) = 0;
virtual std::vector<uint8_t> GetDiscoveryMessage(void) = 0;
- virtual std::string GetWatchingTopic(void) = 0;
+
+ void Start(void);
+ void Stop(void);
+ void HandleRemovedClient(const std::string &discovery_id);
+ void HandleMsg(const std::string &discovery_id, const std::vector<uint8_t> &message);
+ void SetIceCandidateAddedCallback(IceCandidateAddedCallback cb);
+ void SetStreamStartCallback(StreamStartCallback cb);
+ void SetStreamStopCallback(StreamStopCallback cb);
+ std::string GetTopic(void) const;
+ std::string GetWatchingTopic(void) const;
protected:
std::string topic_;
+ std::string watching_topic_;
// TODO: why dont' we remove below
std::string aitt_id_;
std::string thread_id_;
+ // TODO: What if user copies the module?
+ // Think about that case with destructor
+ std::map<std::string /* Peer Aitt Discovery ID */, WebRtcStream *> streams_;
+ StreamStartCallback stream_start_cb_;
+ StreamStopCallback stream_stop_cb_;
+ IceCandidateAddedCallback ice_candidate_added_cb_;
private:
virtual void SetWebRtcStreamCallbacks(WebRtcStream &stream) = 0;
+ virtual void HandleStreamState(const std::string &discovery_id,
+ const std::vector<uint8_t> &message) = 0;
+ virtual void HandleStreamInfo(const std::string &discovery_id,
+ const std::vector<uint8_t> &message) = 0;
};
} // namespace AittWebRTCNamespace
};
void UnsetOnSignalingStateNotifyCb(void) { on_signaling_state_notify_cb_ = nullptr; };
- void SetOnIceCandidateCb(
- std::function<void(std::string)> on_ice_candidate_cb)
+ void SetOnIceCandidateCb(std::function<void(std::string)> on_ice_candidate_cb)
{
on_ice_candidate_cb_ = on_ice_candidate_cb;
};
return ret == WEBRTC_ERROR_NONE;
}
+void WebRtcStream::SetStreamId(const std::string &id)
+{
+ id_ = id;
+}
+
+std::string WebRtcStream::GetStreamId(void) const
+{
+ return id_;
+}
+
+void WebRtcStream::SetPeerId(const std::string &id)
+{
+ peer_id_ = id;
+}
+
+std::string &WebRtcStream::GetPeerId(void)
+{
+ return peer_id_;
+}
+
bool WebRtcStream::AddIceCandidateFromMessage(const std::string &ice_message)
{
ERR("%s", __func__);
webrtc_data_channel_unset_open_cb(channel_);
}
+WebRtcEventHandler &WebRtcStream::GetEventHandler(void)
+{
+ return event_handler_;
+}
+
void WebRtcStream::OnError(webrtc_h webrtc, webrtc_error_e error, webrtc_state_e state,
void *user_data)
{
void DetachSignals(void);
// Cautions : Event handler is not a pointer. So, change event_handle after Set Event handler
// doesn't affect event handler which is included in WebRtcStream
- void SetEventHandler(WebRtcEventHandler event_handler) { event_handler_ = event_handler; };
- WebRtcEventHandler &GetEventHandler(void) { return event_handler_; };
+ WebRtcEventHandler &GetEventHandler(void);
bool CreateOfferAsync(std::function<void(std::string)> on_created_cb);
- void CallOnOfferCreatedCb(std::string offer)
- {
- if (on_offer_created_cb_)
- on_offer_created_cb_(offer);
- }
bool CreateAnswerAsync(std::function<void(std::string)> on_created_cb);
- void CallOnAnswerCreatedCb(std::string answer)
- {
- if (on_answer_created_cb_)
- on_answer_created_cb_(answer);
- }
- void SetPreparedLocalDescription(const std::string &description)
- {
- local_description_ = description;
- };
- std::string GetPreparedLocalDescription(void) const { return local_description_; };
bool SetLocalDescription(const std::string &description);
bool SetRemoteDescription(const std::string &description);
- void SetStreamId(const std::string &id) { id_ = id; };
- std::string GetStreamId(void) const { return id_; };
- void SetPeerId(const std::string &id) { peer_id_ = id; };
- std::string &GetPeerId(void) { return peer_id_; };
+ void SetStreamId(const std::string &id);
+ std::string GetStreamId(void) const;
+ void SetPeerId(const std::string &id);
+ std::string &GetPeerId(void);
bool AddIceCandidateFromMessage(const std::string &ice_message);
bool AddPeerInformation(const std::string &sdp, const std::vector<std::string> &ice_candidates);
{
DBG("OnStreamStateChanged");
if (state == WebRtcState::Stream::NEGOTIATING) {
- auto on_offer_created =
- std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test);
- stream.CreateOfferAsync(on_offer_created);
+ stream.CreateOfferAsync(
+ std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test));
}
}
static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcStreamTest *test)
WebRtcStream stream{};
EXPECT_EQ(true, stream.Create(true, false)) << "Failed to create source stream";
EXPECT_EQ(true, stream.AttachCameraSource()) << "Failed to attach camera source";
- auto on_stream_state_changed_cb =
- std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream), this);
- stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
+ stream.GetEventHandler().SetOnStateChangedCb(
+ std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream), this));
- auto on_ice_gathering_state_changed_cb =
- std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this);
- stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
+ stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(
+ std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this));
stream.Start();
IterateEventLoop();
EXPECT_EQ(WebRtcMessage::Type::SDP, WebRtcMessage::getMessageType(local_description_));
WebRtcSourceOffererTest *test)
{
DBG("OnSrcStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
- if (state == WebRtcState::Stream::NEGOTIATING) {
- auto on_offer_created =
- std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test);
- stream.CreateOfferAsync(on_offer_created);
- }
+ if (state == WebRtcState::Stream::NEGOTIATING)
+ stream.CreateOfferAsync(
+ std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test));
}
static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcSourceOffererTest *test)
WebRtcSourceOffererTest *test)
{
DBG("OnSinkSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
- if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
- auto on_answer_created =
- std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test);
- stream.CreateAnswerAsync(on_answer_created);
- }
+ if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER)
+ stream.CreateAnswerAsync(
+ std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test));
}
static void OnSinkIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream,
{
EXPECT_EQ(true, src_stream_.Create(true, false)) << "Failed to create source stream";
EXPECT_EQ(true, src_stream_.AttachCameraSource()) << "Failed to attach camera source";
- auto on_src_stream_state_changed_cb =
- std::bind(OnSrcStreamStateChanged, std::placeholders::_1, std::ref(src_stream_), this);
- src_stream_.GetEventHandler().SetOnStateChangedCb(on_src_stream_state_changed_cb);
+ src_stream_.GetEventHandler().SetOnStateChangedCb(
+ std::bind(OnSrcStreamStateChanged, std::placeholders::_1, std::ref(src_stream_), this));
- auto on_src_signaling_state_changed_cb =
- std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this);
- src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_src_signaling_state_changed_cb);
+ src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(
+ std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this));
- auto on_src_ice_gathering_state_changed_cb = std::bind(OnSrcIceGatheringStateNotify,
- std::placeholders::_1, std::ref(src_stream_), this);
- src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(
- on_src_ice_gathering_state_changed_cb);
+ src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind(
+ OnSrcIceGatheringStateNotify, std::placeholders::_1, std::ref(src_stream_), this));
src_stream_.Start();
EXPECT_EQ(true, sink_stream_.Create(false, false)) << "Failed to create sink stream";
- auto on_sink_stream_state_changed_cb =
- std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this);
- sink_stream_.GetEventHandler().SetOnStateChangedCb(on_sink_stream_state_changed_cb);
+ sink_stream_.GetEventHandler().SetOnStateChangedCb(
+ std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this));
- auto on_sink_signaling_state_changed_cb = std::bind(OnSinkSignalingStateNotify,
- std::placeholders::_1, std::ref(sink_stream_), this);
- sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_sink_signaling_state_changed_cb);
+ sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(std::bind(OnSinkSignalingStateNotify,
+ std::placeholders::_1, std::ref(sink_stream_), this));
- auto on_sink_ice_gathering_state_changed_cb = std::bind(OnSinkIceGatheringStateNotify,
- std::placeholders::_1, std::ref(sink_stream_), this);
- sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(
- on_sink_ice_gathering_state_changed_cb);
+ sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind(
+ OnSinkIceGatheringStateNotify, std::placeholders::_1, std::ref(sink_stream_), this));
- auto on_sink_encoded_frame_cb = std::bind(OnSinkStreamEncodedFrame, this);
- sink_stream_.GetEventHandler().SetOnEncodedFrameCb(on_sink_encoded_frame_cb);
+ sink_stream_.GetEventHandler().SetOnEncodedFrameCb(std::bind(OnSinkStreamEncodedFrame, this));
sink_stream_.Start();
IterateEventLoop();
}
WebRtcSinkOffererTest *test)
{
DBG("OnSinkStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
- if (state == WebRtcState::Stream::NEGOTIATING) {
- auto on_offer_created =
- std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test);
- stream.CreateOfferAsync(on_offer_created);
- }
+ if (state == WebRtcState::Stream::NEGOTIATING)
+ stream.CreateOfferAsync(
+ std::bind(OnOfferCreated, std::placeholders::_1, std::ref(stream), test));
}
static void OnOfferCreated(std::string sdp, WebRtcStream &stream, WebRtcSinkOffererTest *test)
WebRtcSinkOffererTest *test)
{
DBG("OnSrcSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
- if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
- auto on_answer_created =
- std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test);
- stream.CreateAnswerAsync(on_answer_created);
- }
+ if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER)
+ stream.CreateAnswerAsync(
+ std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream), test));
}
static void OnSrcIceGatheringStateNotify(WebRtcState::IceGathering state, WebRtcStream &stream,
std::bind(OnSrcStreamStateChanged, std::placeholders::_1, std::ref(src_stream_), this);
src_stream_.GetEventHandler().SetOnStateChangedCb(on_src_stream_state_changed_cb);
- auto on_src_signaling_state_changed_cb =
- std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this);
- src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_src_signaling_state_changed_cb);
+ src_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(
+ std::bind(OnSrcSignalingStateNotify, std::placeholders::_1, std::ref(src_stream_), this));
- auto on_src_ice_gathering_state_changed_cb = std::bind(OnSrcIceGatheringStateNotify,
- std::placeholders::_1, std::ref(src_stream_), this);
- src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(
- on_src_ice_gathering_state_changed_cb);
+ src_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind(
+ OnSrcIceGatheringStateNotify, std::placeholders::_1, std::ref(src_stream_), this));
src_stream_.Start();
EXPECT_EQ(true, sink_stream_.Create(false, false)) << "Failed to create sink stream";
- auto on_sink_stream_state_changed_cb =
- std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this);
- sink_stream_.GetEventHandler().SetOnStateChangedCb(on_sink_stream_state_changed_cb);
+ sink_stream_.GetEventHandler().SetOnStateChangedCb(
+ std::bind(OnSinkStreamStateChanged, std::placeholders::_1, std::ref(sink_stream_), this));
- auto on_sink_signaling_state_changed_cb = std::bind(OnSinkSignalingStateNotify,
- std::placeholders::_1, std::ref(sink_stream_), this);
- sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(on_sink_signaling_state_changed_cb);
+ sink_stream_.GetEventHandler().SetOnSignalingStateNotifyCb(std::bind(OnSinkSignalingStateNotify,
+ std::placeholders::_1, std::ref(sink_stream_), this));
- auto on_sink_ice_gathering_state_changed_cb = std::bind(OnSinkIceGatheringStateNotify,
- std::placeholders::_1, std::ref(sink_stream_), this);
- sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(
- on_sink_ice_gathering_state_changed_cb);
+ sink_stream_.GetEventHandler().SetOnIceGatheringStateNotifyCb(std::bind(
+ OnSinkIceGatheringStateNotify, std::placeholders::_1, std::ref(sink_stream_), this));
- auto on_sink_encoded_frame_cb = std::bind(OnSinkStreamEncodedFrame, this);
- sink_stream_.GetEventHandler().SetOnEncodedFrameCb(on_sink_encoded_frame_cb);
+ sink_stream_.GetEventHandler().SetOnEncodedFrameCb(std::bind(OnSinkStreamEncodedFrame, this));
sink_stream_.Start();
IterateEventLoop();
}
// Src should subscribe WEBRTC_SINK_TOPIC but this is for test
discovery_cb_ = discovery_engine_.AddDiscoveryCB(src_manager->GetTopic(), on_discovered);
- auto on_stream_started =
- std::bind(OnStreamStarted, this, static_cast<SrcStreamManager *>(src_manager.get()));
- src_manager->SetStreamStartCallback(on_stream_started);
-
- auto on_stream_stopped =
- std::bind(OnStreamStopped, this, static_cast<SrcStreamManager *>(src_manager.get()));
- src_manager->SetStreamStopCallback(on_stream_stopped);
+ src_manager->SetStreamStartCallback(
+ std::bind(OnStreamStarted, this, static_cast<SrcStreamManager *>(src_manager.get())));
+ src_manager->SetStreamStopCallback(
+ std::bind(OnStreamStopped, this, static_cast<SrcStreamManager *>(src_manager.get())));
src_manager->Start();
IterateEventLoop();
g_main_loop_quit(test->mainLoop_);
}
- static void OnIceCandidateAdded(WebRtcStream &stream, SinkStreamManager *sink_mgr,
- SinkStreamManagerTest *test)
+ static void OnIceCandidateAdded(SinkStreamManager *sink_mgr, SinkStreamManagerTest *test)
{
DBG("OnIceCandidateAdded");
auto discovery_message = sink_mgr->GetDiscoveryMessage();
discovery_message.size());
}
- static void OnStreamReady(WebRtcStream &stream, SinkStreamManager *sink_mgr,
- SinkStreamManagerTest *test)
- {
- DBG("OnStreamReady");
- auto discovery_message = sink_mgr->GetDiscoveryMessage();
- test->discovery_engine_.UpdateDiscoveryMsg(sink_mgr->GetTopic(), discovery_message.data(),
- discovery_message.size());
- }
-
int discovery_cb_;
std::string test_topic_;
aitt::AittDiscovery discovery_engine_;
// Sink should subscribe WEBRTC_SRC_TOPIC but this is for test
discovery_cb_ = discovery_engine_.AddDiscoveryCB(sink_manager->GetTopic(), on_discovered);
- auto on_ice_candidate_added = std::bind(OnIceCandidateAdded, std::placeholders::_1,
- static_cast<SinkStreamManager *>(sink_manager.get()), this);
- sink_manager->SetIceCandidateAddedCallback(on_ice_candidate_added);
-
- auto on_stream_ready = std::bind(OnStreamReady, std::placeholders::_1,
- static_cast<SinkStreamManager *>(sink_manager.get()), this);
- sink_manager->SetStreamReadyCallback(on_stream_ready);
+ sink_manager->SetIceCandidateAddedCallback(std::bind(OnIceCandidateAdded,
+ static_cast<SinkStreamManager *>(sink_manager.get()), this));
sink_manager->Start();
IterateEventLoop();
sink_manager_->GetWatchingTopic(), discovered_at_sink);
sink_discovery_engine_.Restart();
- auto on_stream_stopped = std::bind(OnSinkStreamStopped, this);
- sink_manager_->SetStreamStopCallback(on_stream_stopped);
+ sink_manager_->SetStreamStopCallback(std::bind(OnSinkStreamStopped, this));
+ sink_manager_->SetIceCandidateAddedCallback(
+ std::bind(OnSinkIceCandidate, this));
- auto on_ice_candidate = std::bind(OnSinkIceCandidate, std::placeholders::_1, this);
- sink_manager_->SetIceCandidateAddedCallback(on_ice_candidate);
-
- auto on_sink_stream_ready = std::bind(OnSinkStreamReady, std::placeholders::_1, this);
- sink_manager_->SetStreamReadyCallback(on_sink_stream_ready);
-
- auto on_encoded_frame = std::bind(OnEncodedFrame, std::placeholders::_1, this);
static_cast<SinkStreamManager *>(sink_manager_.get())
- ->SetOnEncodedFrameCallback(on_encoded_frame);
+ ->SetOnEncodedFrameCallback(std::bind(OnEncodedFrame, this));
sink_manager_->Start();
}
msg.size());
}
- static void OnSinkIceCandidate(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+ static void OnSinkIceCandidate(SinkSrcStreamManagerTest *test)
{
DBG("OnSinkIceCandidate");
auto discovery_message = test->sink_manager_->GetDiscoveryMessage();
discovery_message.data(), discovery_message.size());
}
- static void OnSinkStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
- {
- DBG("OnSinkStreamReady");
-
- auto discovery_message = test->sink_manager_->GetDiscoveryMessage();
- test->sink_discovery_engine_.UpdateDiscoveryMsg(test->sink_manager_->GetTopic(),
- discovery_message.data(), discovery_message.size());
- }
-
- static void OnEncodedFrame(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+ static void OnEncodedFrame(SinkSrcStreamManagerTest *test)
{
- static_cast<SinkStreamManager *>(test->sink_manager_.get())->SetOnEncodedFrameCallback(nullptr);
+ static_cast<SinkStreamManager *>(test->sink_manager_.get())
+ ->SetOnEncodedFrameCallback(nullptr);
if (test->stop_sink_first_)
test->AddIdleStopSinkStream();
discovered_at_src);
src_discovery_engine_.Restart();
- auto on_stream_started = std::bind(OnStreamStarted, this);
- src_manager_->SetStreamStartCallback(on_stream_started);
- auto on_stream_stopped = std::bind(OnSrcStreamStopped, this);
- src_manager_->SetStreamStopCallback(on_stream_stopped);
- auto on_ice_candidate = std::bind(OnSrcIceCandidate, std::placeholders::_1, this);
- src_manager_->SetIceCandidateAddedCallback(on_ice_candidate);
-
- auto on_src_stream_ready = std::bind(OnSrcStreamReady, std::placeholders::_1, this);
- src_manager_->SetStreamReadyCallback(on_src_stream_ready);
+ src_manager_->SetStreamStartCallback(std::bind(OnStreamStarted, this));
+ src_manager_->SetStreamStopCallback(std::bind(OnSrcStreamStopped, this));
+ src_manager_->SetIceCandidateAddedCallback(
+ std::bind(OnSrcIceCandidate, this));
src_manager_->Start();
}
msg.size());
}
- static void OnSrcIceCandidate(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
+ static void OnSrcIceCandidate(SinkSrcStreamManagerTest *test)
{
DBG("OnIceCandidateAdded");
auto discovery_message = test->src_manager_->GetDiscoveryMessage();
discovery_message.data(), discovery_message.size());
}
- static void OnSrcStreamReady(WebRtcStream &stream, SinkSrcStreamManagerTest *test)
- {
- DBG("OnSrcStreamReady");
-
- auto discovery_message = test->src_manager_->GetDiscoveryMessage();
- test->src_discovery_engine_.UpdateDiscoveryMsg(test->src_manager_->GetTopic(),
- discovery_message.data(), discovery_message.size());
- }
-
void StartSinkFirst(void)
{
start_sink_id_ = g_timeout_add_seconds(1, GSourceStartSink, this);