summaryrefslogtreecommitdiff
path: root/modules/webrtc/SrcStreamManager.cc
diff options
context:
space:
mode:
authorJiung Yu <jiung.yu@samsung.com>2022-11-08 09:50:10 +0900
committerYoungjae Shin <yj99.shin@samsung.com>2022-11-09 17:27:45 +0900
commit8b5b71d1edc17d7acb62a1d06a364d849c920004 (patch)
tree51bbd7c7cf94f40d347920f8e3dbdac62764b4d3 /modules/webrtc/SrcStreamManager.cc
parent2942726fec194e1bd8f39a497e09a69bcb4d53c3 (diff)
downloadaitt-8b5b71d1edc17d7acb62a1d06a364d849c920004.tar.gz
aitt-8b5b71d1edc17d7acb62a1d06a364d849c920004.tar.bz2
aitt-8b5b71d1edc17d7acb62a1d06a364d849c920004.zip
Add WebRTC Discovery using AITT Discovery
Diffstat (limited to 'modules/webrtc/SrcStreamManager.cc')
-rw-r--r--modules/webrtc/SrcStreamManager.cc223
1 files changed, 166 insertions, 57 deletions
diff --git a/modules/webrtc/SrcStreamManager.cc b/modules/webrtc/SrcStreamManager.cc
index 4170443..be11c96 100644
--- a/modules/webrtc/SrcStreamManager.cc
+++ b/modules/webrtc/SrcStreamManager.cc
@@ -13,8 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
#include "SrcStreamManager.h"
+#include <flatbuffers/flexbuffers.h>
+
#include <sstream>
#include "aitt_internal.h"
@@ -23,13 +26,40 @@ namespace AittWebRTCNamespace {
SrcStreamManager::SrcStreamManager(const std::string &topic, const std::string &aitt_id,
const std::string &thread_id)
- : StreamManager(topic, aitt_id, thread_id), stream_(nullptr), stream_ready_cb_(nullptr)
+ : StreamManager(topic + "/SRC", aitt_id, thread_id), watching_topic_(topic + "/SINK")
{
}
SrcStreamManager::~SrcStreamManager()
{
- Stop();
+ // TODO: You should take care about stream resource
+ for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr)
+ itr->second->Destroy();
+ sink_streams_.clear();
+}
+
+void SrcStreamManager::Start(void)
+{
+ DBG("%s %s", __func__, GetTopic().c_str());
+ if (stream_start_cb_)
+ stream_start_cb_();
+}
+
+void SrcStreamManager::Stop(void)
+{
+ DBG("%s %s", __func__, GetTopic().c_str());
+ // TODO: You should take care about stream resource
+ for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr)
+ itr->second->Destroy();
+ sink_streams_.clear();
+
+ if (stream_stop_cb_)
+ stream_stop_cb_();
+}
+
+void SrcStreamManager::SetIceCandidateAddedCallback(IceCandidateAddedCallback cb)
+{
+ ice_candidate_added_cb_ = cb;
}
void SrcStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
@@ -37,18 +67,32 @@ void SrcStreamManager::SetStreamReadyCallback(StreamReadyCallback cb)
stream_ready_cb_ = cb;
}
+void SrcStreamManager::SetStreamStartCallback(StreamStartCallback cb)
+{
+ stream_start_cb_ = cb;
+}
+
+void SrcStreamManager::SetStreamStopCallback(StreamStopCallback cb)
+{
+ stream_stop_cb_ = cb;
+}
+
void SrcStreamManager::SetWebRtcStreamCallbacks(WebRtcStream &stream)
{
- auto on_stream_state_changed_cb =
- std::bind(OnStreamStateChanged, std::placeholders::_1, std::ref(stream));
+ auto on_stream_state_changed_cb = std::bind(&SrcStreamManager::OnStreamStateChanged, this,
+ std::placeholders::_1, std::ref(stream));
stream.GetEventHandler().SetOnStateChangedCb(on_stream_state_changed_cb);
- auto on_signaling_state_notify_cb =
- std::bind(OnSignalingStateNotify, std::placeholders::_1, std::ref(stream));
+ auto on_ice_candidate_added_cb = std::bind(&SrcStreamManager::OnIceCandidate, this,
+ std::placeholders::_1, std::ref(stream));
+ stream.GetEventHandler().SetOnIceCandidateCb(on_ice_candidate_added_cb);
+
+ auto on_signaling_state_notify_cb = std::bind(&SrcStreamManager::OnSignalingStateNotify, this,
+ std::placeholders::_1, std::ref(stream));
stream.GetEventHandler().SetOnSignalingStateNotifyCb(on_signaling_state_notify_cb);
- auto on_ice_gathering_state_changed_cb =
- std::bind(OnIceGatheringStateNotify, std::placeholders::_1, std::ref(stream), this);
+ auto on_ice_gathering_state_changed_cb = std::bind(&SrcStreamManager::OnIceGatheringStateNotify,
+ this, std::placeholders::_1, std::ref(stream));
stream.GetEventHandler().SetOnIceGatheringStateNotifyCb(on_ice_gathering_state_changed_cb);
}
@@ -57,6 +101,16 @@ void SrcStreamManager::OnStreamStateChanged(WebRtcState::Stream state, WebRtcStr
DBG("OnSrcStreamStateChanged: %s", WebRtcState::StreamToStr(state).c_str());
}
+void SrcStreamManager::OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream)
+{
+ DBG("OnSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
+ if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
+ auto on_answer_created = std::bind(&SrcStreamManager::OnAnswerCreated, this,
+ std::placeholders::_1, std::ref(stream));
+ stream.CreateAnswerAsync(on_answer_created);
+ }
+}
+
void SrcStreamManager::OnAnswerCreated(std::string sdp, WebRtcStream &stream)
{
DBG("%s", __func__);
@@ -64,85 +118,140 @@ void SrcStreamManager::OnAnswerCreated(std::string sdp, WebRtcStream &stream)
stream.SetLocalDescription(sdp);
}
-void SrcStreamManager::OnSignalingStateNotify(WebRtcState::Signaling state, WebRtcStream &stream)
+void SrcStreamManager::OnIceCandidate(const std::string &candidate, WebRtcStream &stream)
{
- DBG("OnSignalingStateNotify: %s", WebRtcState::SignalingToStr(state).c_str());
- if (state == WebRtcState::Signaling::HAVE_REMOTE_OFFER) {
- auto on_answer_created =
- std::bind(OnAnswerCreated, std::placeholders::_1, std::ref(stream));
- stream.CreateAnswerAsync(on_answer_created);
- }
+ if (ice_candidate_added_cb_)
+ ice_candidate_added_cb_(stream);
}
void SrcStreamManager::OnIceGatheringStateNotify(WebRtcState::IceGathering state,
- WebRtcStream &stream, SrcStreamManager *manager)
+ WebRtcStream &stream)
{
DBG("Src IceGathering State: %s", WebRtcState::IceGatheringToStr(state).c_str());
if (state == WebRtcState::IceGathering::COMPLETE) {
- if (manager && manager->stream_ready_cb_)
- manager->stream_ready_cb_(stream);
+ if (stream_ready_cb_)
+ stream_ready_cb_(stream);
}
}
-void SrcStreamManager::Start(void)
+void SrcStreamManager::HandleRemovedClient(const std::string &discovery_id)
{
- // TODO: What'll be done in start Src Stream Manager?
-}
-
-void SrcStreamManager::AddStream(const std::string &id, const std::vector<uint8_t> &message)
-{
- // TODO Add more streams on same topic
- if (stream_)
+ auto sink_stream_itr = sink_streams_.find(discovery_id);
+ if (sink_stream_itr == sink_streams_.end()) {
+ DBG("There's no sink stream %s", discovery_id.c_str());
return;
+ }
- stream_ = new WebRtcStream();
- SetWebRtcStreamCallbacks(*stream_);
- stream_->Create(true, false);
- stream_->AttachCameraSource();
- stream_->Start();
-
- std::stringstream s_stream;
- s_stream << static_cast<void *>(&stream_);
-
- stream_->SetStreamId(std::string(aitt_id_ + thread_id_ + s_stream.str()));
- stream_->SetPeerId(id);
- stream_->AddDiscoveryInformation(message);
+ // TODO: You should take care about stream resource
+ sink_stream_itr->second->Destroy();
+ sink_streams_.erase(sink_stream_itr);
return;
}
-void SrcStreamManager::Stop(void)
+void SrcStreamManager::HandleMsg(const std::string &discovery_id,
+ const std::vector<uint8_t> &message)
{
- // TODO: Ad-hoc method
- if (stream_) {
- delete stream_;
- stream_ = nullptr;
- }
+ if (flexbuffers::GetRoot(message).IsString())
+ HandleStreamState(discovery_id, message);
+ else if (flexbuffers::GetRoot(message).IsVector())
+ HandleStreamInfo(discovery_id, message);
}
-void SrcStreamManager::HandleRemovedClient(const std::string &id)
+void SrcStreamManager::HandleStreamState(const std::string &discovery_id,
+ const std::vector<uint8_t> &message)
{
- if (stream_ && stream_->GetPeerId().compare(id) == 0) {
- delete stream_;
- stream_ = nullptr;
- }
+ auto sink_state = flexbuffers::GetRoot(message).ToString();
+
+ if (sink_state.compare("STOP") == 0)
+ HandleRemovedClient(discovery_id);
+ else
+ DBG("Invalid message %s", sink_state);
}
-void SrcStreamManager::HandleDiscoveredStream(const std::string &id,
+void SrcStreamManager::HandleStreamInfo(const std::string &discovery_id,
const std::vector<uint8_t> &message)
{
- auto info = WebRtcMessage::ParseDiscoveryMessage(message);
- if (info.is_src_ || info.topic_.compare(topic_)) {
- DBG("Is src or topic not matched");
+ if (!WebRtcMessage::IsValidStreamInfo(message)) {
+ DBG("Invalid streams info");
return;
}
- if (stream_) {
- DBG("Supports one stream at once currently");
- return;
+ // sink_streams have a stream at normal situation
+ auto sink_streams = flexbuffers::GetRoot(message).AsVector();
+ for (size_t stream_idx = 0; stream_idx < sink_streams.size(); ++stream_idx) {
+ auto stream = sink_streams[stream_idx].AsMap();
+ auto id = stream["id"].AsString().str();
+ auto peer_id = stream["peer_id"].AsString().str();
+ auto sdp = stream["sdp"].AsString().str();
+ std::vector<std::string> ice_candidates;
+ auto ice_info = stream["ice_candidates"].AsVector();
+ for (size_t ice_idx = 0; ice_idx < ice_info.size(); ++ice_idx)
+ ice_candidates.push_back(ice_info[ice_idx].AsString().str());
+ UpdateStreamInfo(discovery_id, id, peer_id, sdp, ice_candidates);
}
+}
+
+void SrcStreamManager::UpdateStreamInfo(const std::string &discovery_id, const std::string &id,
+ const std::string &peer_id, const std::string &sdp,
+ const std::vector<std::string> &ice_candidates)
+{
+ auto sink_stream = sink_streams_.find(discovery_id);
+ if (sink_stream == sink_streams_.end())
+ AddStream(discovery_id, id, sdp, ice_candidates);
+ else
+ sink_stream->second->UpdatePeerInformation(ice_candidates);
+}
- AddStream(id, message);
+void SrcStreamManager::AddStream(const std::string &discovery_id, const std::string &id,
+ const std::string &sdp, const std::vector<std::string> &ice_candidates)
+{
+ auto stream = new WebRtcStream();
+ SetWebRtcStreamCallbacks(*stream);
+ stream->Create(true, false);
+ stream->AttachCameraSource();
+ stream->Start();
+
+ std::stringstream s_stream;
+ s_stream << static_cast<void *>(stream);
+
+ stream->SetStreamId(std::string(thread_id_ + s_stream.str()));
+ stream->SetPeerId(id);
+ stream->AddPeerInformation(sdp, ice_candidates);
+
+ sink_streams_[discovery_id] = stream;
+
+ return;
+}
+
+std::vector<uint8_t> SrcStreamManager::GetDiscoveryMessage(void)
+{
+ std::vector<uint8_t> message;
+
+ flexbuffers::Builder fbb;
+ fbb.Vector([&] {
+ for (auto itr = sink_streams_.begin(); itr != sink_streams_.end(); ++itr) {
+ fbb.Map([&] {
+ fbb.String("id", itr->second->GetStreamId());
+ fbb.String("peer_id", itr->second->GetPeerId());
+ fbb.String("sdp", itr->second->GetLocalDescription());
+ fbb.Vector("ice_candidates", [&]() {
+ for (const auto &candidate : itr->second->GetIceCandidates()) {
+ fbb.String(candidate);
+ }
+ });
+ });
+ }
+ });
+ fbb.Finish();
+
+ message = fbb.GetBuffer();
+ return message;
+}
+
+std::string SrcStreamManager::GetWatchingTopic(void)
+{
+ return watching_topic_;
}
} // namespace AittWebRTCNamespace