diff options
author | Jiung Yu <jiung.yu@samsung.com> | 2022-11-08 09:50:10 +0900 |
---|---|---|
committer | Youngjae Shin <yj99.shin@samsung.com> | 2022-11-09 17:27:45 +0900 |
commit | 8b5b71d1edc17d7acb62a1d06a364d849c920004 (patch) | |
tree | 51bbd7c7cf94f40d347920f8e3dbdac62764b4d3 /modules/webrtc/SrcStreamManager.cc | |
parent | 2942726fec194e1bd8f39a497e09a69bcb4d53c3 (diff) | |
download | aitt-8b5b71d1edc17d7acb62a1d06a364d849c920004.tar.gz aitt-8b5b71d1edc17d7acb62a1d06a364d849c920004.tar.bz2 aitt-8b5b71d1edc17d7acb62a1d06a364d849c920004.zip |
Add WebRTC Discovery using AITT Discovery
Diffstat (limited to 'modules/webrtc/SrcStreamManager.cc')
-rw-r--r-- | modules/webrtc/SrcStreamManager.cc | 223 |
1 files changed, 166 insertions, 57 deletions
diff --git a/modules/webrtc/SrcStreamManager.cc b/modules/webrtc/SrcStreamManager.cc index 4170443..be11c96 100644 --- a/modules/webrtc/SrcStreamManager.cc +++ b/modules/webrtc/SrcStreamManager.cc @@ -13,8 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "SrcStreamManager.h" +#include <flatbuffers/flexbuffers.h> + #include <sstream> #include "aitt_internal.h" @@ -23,13 +26,40 @@ namespace AittWebRTCNamespace { SrcStreamManager::SrcStreamManager(const std::string &topic, const std::string &aitt_id, const std::string &thread_id) - : StreamManager(topic, aitt_id, thread_id), stream_(nullptr), stream_ready_cb_(nullptr) + : StreamManager(topic + "/SRC", aitt_id, thread_id), watching_topic_(topic + "/SINK") { } SrcStreamManager::~SrcStreamManager() { - Stop(); + // TODO: You should take care about stream resource + for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr) + itr->second->Destroy(); + sink_streams_.clear(); +} + +void SrcStreamManager::Start(void) +{ + DBG("%s %s", __func__, GetTopic().c_str()); + if (stream_start_cb_) + stream_start_cb_(); +} + +void SrcStreamManager::Stop(void) +{ + DBG("%s %s", __func__, GetTopic().c_str()); + // TODO: You should take care about stream resource + for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr) + itr->second->Destroy(); + sink_streams_.clear(); + + if (stream_stop_cb_) + stream_stop_cb_(); +} + +void SrcStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb) +{ + ice_candidate_added_cb_ = cb; } void SrcStreamManager::SetStreamReadyCallback(StreamReadyCallback cb) @@ -37,18 +67,32 @@ void SrcStreamManager::SetStreamReadyCallback(StreamReadyCallback cb) stream_ready_cb_ = cb; } +void SrcStreamManager::SetStreamStartCallback(StreamStartCallback cb) +{ + stream_start_cb_ = cb; +} + +void SrcStreamManager::SetStreamStopCallback(StreamStopCallback cb) +{ + stream_stop_cb_ = cb; +} + void SrcStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream) { - auto on_stream_state_changed_cb = - std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream)); + auto on_stream_state_changed_cb = std::bind(&SrcStreamManager::OnStreamStateChanged, this, + std::placeholders::_1, std::ref(stream)); stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb); - auto on_signaling_state_notify_cb = - std::bind(OnSignalingStateNotify, std::placeholders::_1, std::ref(stream)); + auto on_ice_candidate_added_cb = std::bind(&SrcStreamManager::OnIceCandidate, this, + std::placeholders::_1, std::ref(stream)); + stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb); + + auto on_signaling_state_notify_cb = std::bind(&SrcStreamManager::OnSignalingStateNotify, this, + std::placeholders::_1, std::ref(stream)); stream.GetEventHandler().SetOnSignalingStateNotifyCb(on_signaling_state_notify_cb); - auto on_ice_gathering_state_changed_cb = - std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this); + auto on_ice_gathering_state_changed_cb = std::bind(&SrcStreamManager::OnIceGatheringStateNotify, + this, std::placeholders::_1, std::ref(stream)); stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb); } @@ -57,6 +101,16 @@ void SrcStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStr DBG("OnSrcStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str()); } +void SrcStreamManager::OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream) +{ + DBG("OnSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str()); + if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) { + auto on_answer_created = std::bind(&SrcStreamManager::OnAnswerCreated, this, + std::placeholders::_1, std::ref(stream)); + stream.CreateAnswerAsync(on_answer_created); + } +} + void SrcStreamManager::OnAnswerCreated(std::string sdp, WebRtcStream &stream) { DBG("%s", __func__); @@ -64,85 +118,140 @@ void SrcStreamManager::OnAnswerCreated(std::string sdp, WebRtcStream &stream) stream.SetLocalDescription(sdp); } -void SrcStreamManager::OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream) +void SrcStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream) { - DBG("OnSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str()); - if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) { - auto on_answer_created = - std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream)); - stream.CreateAnswerAsync(on_answer_created); - } + if (ice_candidate_added_cb_) + ice_candidate_added_cb_(stream); } void SrcStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state, - WebRtcStream &stream, SrcStreamManager *manager) + WebRtcStream &stream) { DBG("Src IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str()); if (state == WebRtcState::IceGathering::COMPLETE) { - if (manager && manager->stream_ready_cb_) - manager->stream_ready_cb_(stream); + if (stream_ready_cb_) + stream_ready_cb_(stream); } } -void SrcStreamManager::Start(void) +void SrcStreamManager::HandleRemovedClient(const std::string &discovery_id) { - // TODO: What'll be done in start Src Stream Manager? -} - -void SrcStreamManager::AddStream(const std::string &id, const std::vector<uint8_t> &message) -{ - // TODO Add more streams on same topic - if (stream_) + auto sink_stream_itr = sink_streams_.find(discovery_id); + if (sink_stream_itr == sink_streams_.end()) { + DBG("There's no sink stream %s", discovery_id.c_str()); return; + } - stream_ = new WebRtcStream(); - SetWebRtcStreamCallbacks(*stream_); - stream_->Create(true, false); - stream_->AttachCameraSource(); - stream_->Start(); - - std::stringstream s_stream; - s_stream << static_cast<void *>(&stream_); - - stream_->SetStreamId(std::string(aitt_id_ + thread_id_ + s_stream.str())); - stream_->SetPeerId(id); - stream_->AddDiscoveryInformation(message); + // TODO: You should take care about stream resource + sink_stream_itr->second->Destroy(); + sink_streams_.erase(sink_stream_itr); return; } -void SrcStreamManager::Stop(void) +void SrcStreamManager::HandleMsg(const std::string &discovery_id, + const std::vector<uint8_t> &message) { - // TODO: Ad-hoc method - if (stream_) { - delete stream_; - stream_ = nullptr; - } + if (flexbuffers::GetRoot(message).IsString()) + HandleStreamState(discovery_id, message); + else if (flexbuffers::GetRoot(message).IsVector()) + HandleStreamInfo(discovery_id, message); } -void SrcStreamManager::HandleRemovedClient(const std::string &id) +void SrcStreamManager::HandleStreamState(const std::string &discovery_id, + const std::vector<uint8_t> &message) { - if (stream_ && stream_->GetPeerId().compare(id) == 0) { - delete stream_; - stream_ = nullptr; - } + auto sink_state = flexbuffers::GetRoot(message).ToString(); + + if (sink_state.compare("STOP") == 0) + HandleRemovedClient(discovery_id); + else + DBG("Invalid message %s", sink_state); } -void SrcStreamManager::HandleDiscoveredStream(const std::string &id, +void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id, const std::vector<uint8_t> &message) { - auto info = WebRtcMessage::ParseDiscoveryMessage(message); - if (info.is_src_ || info.topic_.compare(topic_)) { - DBG("Is src or topic not matched"); + if (!WebRtcMessage::IsValidStreamInfo(message)) { + DBG("Invalid streams info"); return; } - if (stream_) { - DBG("Supports one stream at once currently"); - return; + // sink_streams have a stream at normal situation + auto sink_streams = flexbuffers::GetRoot(message).AsVector(); + for (size_t stream_idx = 0; stream_idx < sink_streams.size(); ++stream_idx) { + auto stream = sink_streams[stream_idx].AsMap(); + auto id = stream["id"].AsString().str(); + auto peer_id = stream["peer_id"].AsString().str(); + auto sdp = stream["sdp"].AsString().str(); + std::vector<std::string> ice_candidates; + auto ice_info = stream["ice_candidates"].AsVector(); + for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx) + ice_candidates.push_back(ice_info[ice_idx].AsString().str()); + UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates); } +} + +void SrcStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id, + const std::string &peer_id, const std::string &sdp, + const std::vector<std::string> &ice_candidates) +{ + auto sink_stream = sink_streams_.find(discovery_id); + if (sink_stream == sink_streams_.end()) + AddStream(discovery_id, id, sdp, ice_candidates); + else + sink_stream->second->UpdatePeerInformation(ice_candidates); +} - AddStream(id, message); +void SrcStreamManager::AddStream(const std::string &discovery_id, const std::string &id, + const std::string &sdp, const std::vector<std::string> &ice_candidates) +{ + auto stream = new WebRtcStream(); + SetWebRtcStreamCallbacks(*stream); + stream->Create(true, false); + stream->AttachCameraSource(); + stream->Start(); + + std::stringstream s_stream; + s_stream << static_cast<void *>(stream); + + stream->SetStreamId(std::string(thread_id_ + s_stream.str())); + stream->SetPeerId(id); + stream->AddPeerInformation(sdp, ice_candidates); + + sink_streams_[discovery_id] = stream; + + return; +} + +std::vector<uint8_t> SrcStreamManager::GetDiscoveryMessage(void) +{ + std::vector<uint8_t> message; + + flexbuffers::Builder fbb; + fbb.Vector([&] { + for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr) { + fbb.Map([&] { + fbb.String("id", itr->second->GetStreamId()); + fbb.String("peer_id", itr->second->GetPeerId()); + fbb.String("sdp", itr->second->GetLocalDescription()); + fbb.Vector("ice_candidates", [&]() { + for (const auto &candidate : itr->second->GetIceCandidates()) { + fbb.String(candidate); + } + }); + }); + } + }); + fbb.Finish(); + + message = fbb.GetBuffer(); + return message; +} + +std::string SrcStreamManager::GetWatchingTopic(void) +{ + return watching_topic_; } } // namespace AittWebRTCNamespace |