summaryrefslogtreecommitdiff
path: root/modules/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'modules/tcp')
-rw-r--r--modules/tcp/CMakeLists.txt14
-rw-r--r--modules/tcp/Module.cc513
-rw-r--r--modules/tcp/Module.h135
-rw-r--r--modules/tcp/TCP.cc157
-rw-r--r--modules/tcp/TCP.h43
-rw-r--r--modules/tcp/TCPServer.cc132
-rw-r--r--modules/tcp/TCPServer.h37
-rw-r--r--modules/tcp/samples/CMakeLists.txt3
-rw-r--r--modules/tcp/samples/tcp_test.cc235
-rw-r--r--modules/tcp/tests/CMakeLists.txt19
-rw-r--r--modules/tcp/tests/TCPServer_test.cc121
-rw-r--r--modules/tcp/tests/TCP_test.cc149
12 files changed, 1558 insertions, 0 deletions
diff --git a/modules/tcp/CMakeLists.txt b/modules/tcp/CMakeLists.txt
new file mode 100644
index 0000000..edac2fd
--- /dev/null
+++ b/modules/tcp/CMakeLists.txt
@@ -0,0 +1,14 @@
+SET(AITT_TCP aitt-transport-tcp)
+
+INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
+
+ADD_LIBRARY(TCP_OBJ OBJECT TCP.cc TCPServer.cc)
+ADD_LIBRARY(${AITT_TCP} SHARED ../main.cc Module.cc $<TARGET_OBJECTS:TCP_OBJ>)
+TARGET_LINK_LIBRARIES(${AITT_TCP} ${AITT_TCP_NEEDS_LIBRARIES} Threads::Threads ${AITT_COMMON})
+
+INSTALL(TARGETS ${AITT_TCP} DESTINATION ${CMAKE_INSTALL_LIBDIR})
+
+IF(BUILD_TESTING)
+ ADD_SUBDIRECTORY(samples)
+ ADD_SUBDIRECTORY(tests)
+ENDIF(BUILD_TESTING)
diff --git a/modules/tcp/Module.cc b/modules/tcp/Module.cc
new file mode 100644
index 0000000..bc50d7d
--- /dev/null
+++ b/modules/tcp/Module.cc
@@ -0,0 +1,513 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Module.h"
+
+#include <MQ.h>
+#include <flatbuffers/flexbuffers.h>
+#include <unistd.h>
+
+#include "aitt_internal.h"
+
+/*
+ * P2P Data Packet Definition
+ * TopicLength: 4 bytes
+ * TopicString: $TopicLength
+ */
+
+Module::Module(const std::string &ip, AittDiscovery &discovery) : AittTransport(discovery), ip(ip)
+{
+ aittThread = std::thread(&Module::ThreadMain, this);
+
+ discovery_cb = discovery.AddDiscoveryCB(AITT_TYPE_TCP,
+ std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
+ DBG("Discovery Callback : %p, %d", this, discovery_cb);
+}
+
+Module::~Module(void)
+{
+ discovery.RemoveDiscoveryCB(discovery_cb);
+
+ while (main_loop.Quit() == false) {
+ // wait when called before the thread has completely created.
+ usleep(1000);
+ }
+
+ if (aittThread.joinable())
+ aittThread.join();
+}
+
+void Module::ThreadMain(void)
+{
+ pthread_setname_np(pthread_self(), "TCPWorkerLoop");
+ main_loop.Run();
+}
+
+void Module::Publish(const std::string &topic, const void *data, const size_t datalen,
+ const std::string &correlation, AittQoS qos, bool retain)
+{
+ // NOTE:
+ // Iterate discovered service table
+ // PublishMap
+ // map {
+ // "/customTopic/faceRecog": map {
+ // "$clientId": map {
+ // 11234: $handle,
+ //
+ // ...
+ //
+ // 21234: nullptr,
+ // },
+ // },
+ // }
+ std::lock_guard<std::mutex> auto_lock_publish(publishTableLock);
+ for (PublishMap::iterator it = publishTable.begin(); it != publishTable.end(); ++it) {
+ // NOTE:
+ // Find entries that have matched with the given topic
+ if (!aitt::MQ::CompareTopic(it->first, topic))
+ continue;
+
+ // NOTE:
+ // Iterate all hosts
+ for (HostMap::iterator hostIt = it->second.begin(); hostIt != it->second.end(); ++hostIt) {
+ // Iterate all ports,
+ // the current implementation only be able to have the ZERO or a SINGLE entry
+ // hostIt->first // clientId
+ for (PortMap::iterator portIt = hostIt->second.begin(); portIt != hostIt->second.end();
+ ++portIt) {
+ // portIt->first // port
+ // portIt->second // handle
+ if (!portIt->second) {
+ std::string host;
+ {
+ ClientMap::iterator clientIt;
+ std::lock_guard<std::mutex> auto_lock_client(clientTableLock);
+
+ clientIt = clientTable.find(hostIt->first);
+ if (clientIt != clientTable.end())
+ host = clientIt->second;
+
+ // NOTE:
+ // otherwise, it is a critical error
+ // The broken clientTable or subscribeTable
+ }
+
+ std::unique_ptr<TCP> client(std::make_unique<TCP>(host, portIt->first));
+
+ // TODO:
+ // If the client gets disconnected,
+ // This channel entry must be cleared
+ // In order to do that,
+ // There should be an observer to monitor
+ // each connections and manipulate
+ // the discovered service table
+ portIt->second = std::move(client);
+ }
+
+ if (!portIt->second) {
+ ERR("Failed to create a new client instance");
+ continue;
+ }
+
+ SendTopic(topic, portIt);
+ SendPayload(datalen, portIt, data);
+ }
+ } // connectionEntries
+ } // publishTable
+}
+
+void Module::SendTopic(const std::string &topic, Module::PortMap::iterator &portIt)
+{
+ uint32_t topicLen = topic.length();
+ size_t szData = sizeof(topicLen);
+ portIt->second->Send(static_cast<void *>(&topicLen), szData);
+ szData = topicLen;
+ portIt->second->Send(static_cast<const void *>(topic.c_str()), szData);
+}
+
+void Module::SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data)
+{
+ uint32_t sendsize = datalen;
+ size_t szsize = sizeof(sendsize);
+
+ try {
+ if (0 == datalen) {
+ // distinguish between connection problems and zero-size messages
+ INFO("Send zero-size Message");
+ sendsize = UINT32_MAX;
+ }
+ portIt->second->Send(static_cast<void *>(&sendsize), szsize);
+
+ int msgSize = datalen;
+ while (0 < msgSize) {
+ size_t sentSize = msgSize;
+ char *dataIdx = (char *)data + (sendsize - msgSize);
+ portIt->second->Send(dataIdx, sentSize);
+ if (sentSize > 0) {
+ msgSize -= sentSize;
+ }
+ }
+ } catch (std::exception &e) {
+ ERR("An exception(%s) occurs during Send().", e.what());
+ }
+}
+
+void Module::Publish(const std::string &topic, const void *data, const size_t datalen, AittQoS qos,
+ bool retain)
+{
+ Publish(topic, data, datalen, std::string(), qos, retain);
+}
+
+void *Module::Subscribe(const std::string &topic, const AittTransport::SubscribeCallback &cb,
+ void *cbdata, AittQoS qos)
+{
+ std::unique_ptr<TCP::Server> tcpServer;
+
+ unsigned short port = 0;
+ tcpServer = std::make_unique<TCP::Server>("0.0.0.0", port);
+ TCPServerData *listen_info = new TCPServerData;
+ listen_info->impl = this;
+ listen_info->cb = cb;
+ listen_info->cbdata = cbdata;
+ listen_info->topic = topic;
+ auto handle = tcpServer->GetHandle();
+
+ main_loop.AddWatch(handle, AcceptConnection, listen_info);
+
+ // 서비스 테이블에 토픽을 키워드로 프로토콜을 등록한다.
+ {
+ std::lock_guard<std::mutex> autoLock(subscribeTableLock);
+ subscribeTable.insert(SubscribeMap::value_type(topic, std::move(tcpServer)));
+ UpdateDiscoveryMsg();
+ }
+
+ return reinterpret_cast<void *>(handle);
+}
+
+void *Module::Subscribe(const std::string &topic, const AittTransport::SubscribeCallback &cb,
+ const void *data, const size_t datalen, void *cbdata, AittQoS qos)
+{
+ return nullptr;
+}
+
+void *Module::Unsubscribe(void *handlePtr)
+{
+ int handle = static_cast<int>(reinterpret_cast<intptr_t>(handlePtr));
+ TCPServerData *listen_info = dynamic_cast<TCPServerData *>(main_loop.RemoveWatch(handle));
+ if (!listen_info)
+ return nullptr;
+
+ {
+ std::lock_guard<std::mutex> autoLock(subscribeTableLock);
+ auto it = subscribeTable.find(listen_info->topic);
+ if (it == subscribeTable.end())
+ throw std::runtime_error("Service is not registered: " + listen_info->topic);
+
+ subscribeTable.erase(it);
+
+ UpdateDiscoveryMsg();
+ }
+
+ void *cbdata = listen_info->cbdata;
+ listen_info->client_lock.lock();
+ for (auto fd : listen_info->client_list) {
+ TCPData *connect_info = dynamic_cast<TCPData *>(main_loop.RemoveWatch(fd));
+ delete connect_info;
+ }
+ listen_info->client_list.clear();
+ listen_info->client_lock.unlock();
+ delete listen_info;
+
+ return cbdata;
+}
+
+void Module::DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
+ const void *msg, const int szmsg)
+{
+ // NOTE:
+ // Iterate discovered service table
+ // PublishMap
+ // map {
+ // "/customTopic/faceRecog": map {
+ // "clientId.uniq.abcd.123": map {
+ // 11234: pair {
+ // "protocol": 1,
+ // "handle": nullptr,
+ // },
+ //
+ // ...
+ //
+ // 21234: pair {
+ // "protocol": 2,
+ // "handle": nullptr,
+ // }
+ // },
+ // },
+ // }
+
+ if (!status.compare(AittDiscovery::WILL_LEAVE_NETWORK)) {
+ {
+ std::lock_guard<std::mutex> autoLock(clientTableLock);
+ // Delete from the { clientId : Host } mapping table
+ clientTable.erase(clientId);
+ }
+
+ {
+ // NOTE:
+ // Iterate all topics in the publishTable holds discovered client information
+ std::lock_guard<std::mutex> autoLock(publishTableLock);
+ for (auto it = publishTable.begin(); it != publishTable.end(); ++it)
+ it->second.erase(clientId);
+ }
+ return;
+ }
+
+ // serviceMessage (flexbuffers)
+ // map {
+ // "host": "192.168.1.11",
+ // "$topic": port,
+ // }
+ auto map = flexbuffers::GetRoot(static_cast<const uint8_t *>(msg), szmsg).AsMap();
+ std::string host = map["host"].AsString().c_str();
+
+ // NOTE:
+ // Update the clientTable
+ {
+ std::lock_guard<std::mutex> autoLock(clientTableLock);
+ auto clientIt = clientTable.find(clientId);
+ if (clientIt == clientTable.end())
+ clientTable.insert(ClientMap::value_type(clientId, host));
+ else if (clientIt->second.compare(host))
+ clientIt->second = host;
+ }
+
+ auto topics = map.Keys();
+ for (size_t idx = 0; idx < topics.size(); ++idx) {
+ std::string topic = topics[idx].AsString().c_str();
+
+ if (!topic.compare("host"))
+ continue;
+
+ auto port = map[topic].AsUInt16();
+
+ {
+ std::lock_guard<std::mutex> autoLock(publishTableLock);
+ UpdatePublishTable(topic, clientId, port);
+ }
+ }
+}
+
+void Module::UpdateDiscoveryMsg()
+{
+ flexbuffers::Builder fbb;
+ // flexbuffers
+ // {
+ // "host": "127.0.0.1",
+ // "/customTopic/aitt/faceRecog": $port,
+ // "/customTopic/aitt/ASR": 102020,
+ //
+ // ...
+ //
+ // "/customTopic/aitt/+": 20123,
+ // }
+ fbb.Map([this, &fbb]() {
+ fbb.String("host", ip);
+
+ // SubscribeTable
+ // map {
+ // "/customTopic/mytopic": $serverHandle,
+ // ...
+ // }
+ for (auto it = subscribeTable.begin(); it != subscribeTable.end(); ++it) {
+ if (it->second)
+ fbb.UInt(it->first.c_str(), it->second->GetPort());
+ else
+ fbb.UInt(it->first.c_str(), 0); // this is an error case
+ }
+ });
+ fbb.Finish();
+
+ auto buf = fbb.GetBuffer();
+ discovery.UpdateDiscoveryMsg(AITT_TYPE_TCP, buf.data(), buf.size());
+}
+
+void Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
+ MainLoopHandler::MainLoopData *user_data)
+{
+ TCPData *connect_info = dynamic_cast<TCPData *>(user_data);
+ RET_IF(connect_info == nullptr);
+ TCPServerData *parent_info = connect_info->parent;
+ RET_IF(parent_info == nullptr);
+ Module *impl = parent_info->impl;
+ RET_IF(impl == nullptr);
+
+ if (result == MainLoopHandler::HANGUP) {
+ ERR("Disconnected");
+ return impl->HandleClientDisconnect(handle);
+ }
+
+ uint32_t szmsg = 0;
+ size_t szdata = sizeof(szmsg);
+ char *msg = nullptr;
+ std::string topic;
+
+ try {
+ topic = impl->GetTopicName(connect_info);
+ if (topic.empty()) {
+ ERR("Unknown Topic");
+ return impl->HandleClientDisconnect(handle);
+ }
+
+ connect_info->client->Recv(static_cast<void *>(&szmsg), szdata);
+ if (szmsg == 0) {
+ ERR("Disconnected");
+ return impl->HandleClientDisconnect(handle);
+ }
+
+ if (UINT32_MAX == szmsg) {
+ // distinguish between connection problems and zero-size messages
+ INFO("Got zero-size Message");
+ szmsg = 0;
+ }
+
+ msg = static_cast<char *>(malloc(szmsg));
+ int msgSize = szmsg;
+ while (0 < msgSize) {
+ size_t receivedSize = msgSize;
+ connect_info->client->Recv(static_cast<void *>(msg + (szmsg - msgSize)), receivedSize);
+ if (receivedSize > 0) {
+ msgSize -= receivedSize;
+ }
+ }
+ } catch (std::exception &e) {
+ ERR("An exception(%s) occurs during Recv()", e.what());
+ }
+
+ std::string correlation;
+ // TODO:
+ // Correlation data (string) should be filled
+
+ parent_info->cb(topic, msg, szmsg, parent_info->cbdata, correlation);
+ free(msg);
+}
+
+void Module::HandleClientDisconnect(int handle)
+{
+ TCPData *connect_info = dynamic_cast<TCPData *>(main_loop.RemoveWatch(handle));
+ if (connect_info == nullptr) {
+ ERR("No watch data");
+ return;
+ }
+ connect_info->parent->client_lock.lock();
+ auto it = std::find(connect_info->parent->client_list.begin(),
+ connect_info->parent->client_list.end(), handle);
+ connect_info->parent->client_list.erase(it);
+ connect_info->parent->client_lock.unlock();
+
+ delete connect_info;
+}
+
+std::string Module::GetTopicName(Module::TCPData *connect_info)
+{
+ uint32_t topic_len = 0;
+ size_t data_size = sizeof(topic_len);
+ connect_info->client->Recv(static_cast<void *>(&topic_len), data_size);
+
+ if (AITT_TOPIC_NAME_MAX < topic_len) {
+ ERR("Invalid topic name length(%d)", topic_len);
+ return std::string();
+ }
+
+ char data[topic_len];
+ data_size = topic_len;
+ connect_info->client->Recv(data, data_size);
+ if (data_size != topic_len)
+ ERR("Recv() Fail");
+
+ return std::string(data, data_size);
+}
+
+void Module::AcceptConnection(MainLoopHandler::MainLoopResult result, int handle,
+ MainLoopHandler::MainLoopData *user_data)
+{
+ // TODO:
+ // Update the discovery map
+ std::unique_ptr<TCP> client;
+
+ TCPServerData *listen_info = dynamic_cast<TCPServerData *>(user_data);
+ Module *impl = listen_info->impl;
+ {
+ std::lock_guard<std::mutex> autoLock(impl->subscribeTableLock);
+
+ auto clientIt = impl->subscribeTable.find(listen_info->topic);
+ if (clientIt == impl->subscribeTable.end())
+ return;
+
+ client = clientIt->second->AcceptPeer();
+ }
+
+ if (client == nullptr) {
+ ERR("Unable to accept a peer"); // NOTE: FATAL ERROR
+ return;
+ }
+
+ int cHandle = client->GetHandle();
+ listen_info->client_list.push_back(cHandle);
+
+ TCPData *ecd = new TCPData;
+ ecd->parent = listen_info;
+ ecd->client = std::move(client);
+
+ impl->main_loop.AddWatch(cHandle, ReceiveData, ecd);
+}
+
+void Module::UpdatePublishTable(const std::string &topic, const std::string &clientId,
+ unsigned short port)
+{
+ auto topicIt = publishTable.find(topic);
+ if (topicIt == publishTable.end()) {
+ PortMap portMap;
+ portMap.insert(PortMap::value_type(port, nullptr));
+ HostMap hostMap;
+ hostMap.insert(HostMap::value_type(clientId, std::move(portMap)));
+ publishTable.insert(PublishMap::value_type(topic, std::move(hostMap)));
+ return;
+ }
+
+ auto hostIt = topicIt->second.find(clientId);
+ if (hostIt == topicIt->second.end()) {
+ PortMap portMap;
+ portMap.insert(PortMap::value_type(port, nullptr));
+ topicIt->second.insert(HostMap::value_type(clientId, std::move(portMap)));
+ return;
+ }
+
+ // NOTE:
+ // The current implementation only has a single port entry
+ // therefore, if the hostIt is not empty, there is the previous connection
+ if (!hostIt->second.empty()) {
+ auto portIt = hostIt->second.begin();
+
+ if (portIt->first == port)
+ return; // nothing changed. keep the current handle
+
+ // otherwise, delete the connection handle
+ // to make a new connection with the new port
+ hostIt->second.clear();
+ }
+
+ hostIt->second.insert(PortMap::value_type(port, nullptr));
+}
diff --git a/modules/tcp/Module.h b/modules/tcp/Module.h
new file mode 100644
index 0000000..4011980
--- /dev/null
+++ b/modules/tcp/Module.h
@@ -0,0 +1,135 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <AittTransport.h>
+#include <MainLoopHandler.h>
+
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+
+#include "TCPServer.h"
+
+using AittTransport = aitt::AittTransport;
+using MainLoopHandler = aitt::MainLoopHandler;
+using AittDiscovery = aitt::AittDiscovery;
+
+class Module : public AittTransport {
+ public:
+ explicit Module(const std::string &ip, AittDiscovery &discovery);
+ virtual ~Module(void);
+
+ void Publish(const std::string &topic, const void *data, const size_t datalen,
+ const std::string &correlation, AittQoS qos = AITT_QOS_AT_MOST_ONCE,
+ bool retain = false) override;
+
+ void Publish(const std::string &topic, const void *data, const size_t datalen,
+ AittQoS qos = AITT_QOS_AT_MOST_ONCE, bool retain = false) override;
+
+ void *Subscribe(const std::string &topic, const SubscribeCallback &cb, void *cbdata = nullptr,
+ AittQoS qos = AITT_QOS_AT_MOST_ONCE) override;
+
+ void *Subscribe(const std::string &topic, const SubscribeCallback &cb, const void *data,
+ const size_t datalen, void *cbdata = nullptr,
+ AittQoS qos = AITT_QOS_AT_MOST_ONCE) override;
+ void *Unsubscribe(void *handle) override;
+
+ private:
+ struct TCPServerData : public MainLoopHandler::MainLoopData {
+ Module *impl;
+ SubscribeCallback cb;
+ void *cbdata;
+ std::string topic;
+ std::vector<int> client_list;
+ std::mutex client_lock;
+ };
+
+ struct TCPData : public MainLoopHandler::MainLoopData {
+ TCPServerData *parent;
+ std::unique_ptr<TCP> client;
+ };
+
+ // SubscribeTable
+ // map {
+ // "/customTopic/mytopic": $serverHandle,
+ // ...
+ // }
+ using SubscribeMap = std::map<std::string, std::unique_ptr<TCP::Server>>;
+
+ // ClientTable
+ // map {
+ // $clientId: $host,
+ // "client.uniqId.123": "192.168.1.11"
+ // ...
+ // }
+ using ClientMap = std::map<std::string /* id */, std::string /* host */>;
+
+ // NOTE:
+ // There could be multiple clientIds for the single host
+ // If several applications are run on the same device, each applicaion will get unique client
+ // Ids therefore we have to keep in mind that the clientId is not 1:1 matched for the IPAddress.
+
+ // PublishTable
+ // map {
+ // "/customTopic/faceRecog": map {
+ // $clientId: map {
+ // 11234: $clientHandle,
+ //
+ // ...
+ //
+ // 21234: $clientHandle,
+ // },
+ // },
+ // }
+ //
+ // NOTE:
+ // TCP handle should be the unique_ptr, so if we delete the entry from the map,
+ // the handle must be released automatically
+ // in order to make the handle "unique_ptr", it should be a class object not the "void *"
+ using PortMap = std::map<unsigned short /* port */, std::unique_ptr<TCP>>;
+ using HostMap = std::map<std::string /* clientId */, PortMap>;
+ using PublishMap = std::map<std::string /* topic */, HostMap>;
+
+ static void AcceptConnection(MainLoopHandler::MainLoopResult result, int handle,
+ MainLoopHandler::MainLoopData *watchData);
+ void DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
+ const void *msg, const int szmsg);
+ void UpdateDiscoveryMsg();
+ static void ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
+ MainLoopHandler::MainLoopData *watchData);
+ void HandleClientDisconnect(int handle);
+ std::string GetTopicName(TCPData *connect_info);
+ void ThreadMain(void);
+ void SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data);
+ void SendTopic(const std::string &topic, Module::PortMap::iterator &portIt);
+
+ void UpdatePublishTable(const std::string &topic, const std::string &host, unsigned short port);
+
+ MainLoopHandler main_loop;
+ std::thread aittThread;
+ std::string ip;
+ int discovery_cb;
+
+ PublishMap publishTable;
+ std::mutex publishTableLock;
+ SubscribeMap subscribeTable;
+ std::mutex subscribeTableLock;
+ ClientMap clientTable;
+ std::mutex clientTableLock;
+};
diff --git a/modules/tcp/TCP.cc b/modules/tcp/TCP.cc
new file mode 100644
index 0000000..3b6751e
--- /dev/null
+++ b/modules/tcp/TCP.cc
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TCP.h"
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <cstdlib>
+#include <cstring>
+#include <stdexcept>
+
+#include "aitt_internal.h"
+
+TCP::TCP(const std::string &host, unsigned short port) : handle(-1), addrlen(0), addr(nullptr)
+{
+ int ret = 0;
+
+ do {
+ if (port == 0) {
+ ret = EINVAL;
+ break;
+ }
+
+ handle = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
+ if (handle < 0) {
+ ERR("socket() Fail()");
+ break;
+ }
+
+ addrlen = sizeof(sockaddr_in);
+ addr = static_cast<sockaddr *>(calloc(1, addrlen));
+ if (!addr) {
+ ERR("calloc() Fail()");
+ break;
+ }
+
+ sockaddr_in *inet_addr = reinterpret_cast<sockaddr_in *>(addr);
+ if (!inet_pton(AF_INET, host.c_str(), &inet_addr->sin_addr)) {
+ ret = EINVAL;
+ break;
+ }
+
+ inet_addr->sin_port = htons(port);
+ inet_addr->sin_family = AF_INET;
+
+ ret = connect(handle, addr, addrlen);
+ if (ret < 0) {
+ ERR("connect() Fail(%s, %d)", host.c_str(), port);
+ break;
+ }
+
+ SetupOptions();
+ return;
+ } while (0);
+
+ if (ret <= 0)
+ ret = errno;
+
+ free(addr);
+ if (handle >= 0 && close(handle) < 0)
+ ERR_CODE(errno, "close");
+ throw std::runtime_error(strerror(ret));
+}
+
+TCP::TCP(int handle, sockaddr *addr, socklen_t szAddr) : handle(handle), addrlen(szAddr), addr(addr)
+{
+ SetupOptions();
+}
+
+TCP::~TCP(void)
+{
+ if (handle < 0)
+ return;
+
+ free(addr);
+ if (close(handle) < 0)
+ ERR_CODE(errno, "close");
+}
+
+void TCP::SetupOptions(void)
+{
+ int on = 1;
+
+ int ret = setsockopt(handle, IPPROTO_IP, TCP_NODELAY, &on, sizeof(on));
+ if (ret < 0) {
+ ERR_CODE(errno, "delay option setting failed");
+ }
+}
+
+void TCP::Send(const void *data, size_t &szData)
+{
+ int ret = send(handle, data, szData, 0);
+ if (ret < 0) {
+ ERR("Fail to send data, handle = %d, size = %zu", handle, szData);
+ throw std::runtime_error(strerror(errno));
+ }
+
+ szData = ret;
+}
+
+void TCP::Recv(void *data, size_t &szData)
+{
+ int ret = recv(handle, data, szData, 0);
+ if (ret < 0) {
+ ERR("Fail to recv data, handle = %d, size = %zu", handle, szData);
+ throw std::runtime_error(strerror(errno));
+ }
+
+ szData = ret;
+}
+
+int TCP::GetHandle(void)
+{
+ return handle;
+}
+
+void TCP::GetPeerInfo(std::string &host, unsigned short &port)
+{
+ char address[INET_ADDRSTRLEN] = {
+ 0,
+ };
+
+ if (!inet_ntop(AF_INET, &reinterpret_cast<sockaddr_in *>(this->addr)->sin_addr, address,
+ sizeof(address)))
+ throw std::runtime_error(strerror(errno));
+
+ port = ntohs(reinterpret_cast<sockaddr_in *>(this->addr)->sin_port);
+ host = address;
+}
+
+unsigned short TCP::GetPort(void)
+{
+ sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ if (getsockname(handle, reinterpret_cast<sockaddr *>(&addr), &addrlen) < 0)
+ throw std::runtime_error(strerror(errno));
+
+ return ntohs(addr.sin_port);
+}
diff --git a/modules/tcp/TCP.h b/modules/tcp/TCP.h
new file mode 100644
index 0000000..535819c
--- /dev/null
+++ b/modules/tcp/TCP.h
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <sys/socket.h>
+#include <sys/types.h> /* See NOTES */
+
+#include <string>
+
+class TCP {
+ public:
+ class Server;
+
+ TCP(const std::string &host, unsigned short port);
+ virtual ~TCP(void);
+
+ void Send(const void *data, size_t &szData);
+ void Recv(void *data, size_t &szData);
+ int GetHandle(void);
+ unsigned short GetPort(void);
+ void GetPeerInfo(std::string &host, unsigned short &port);
+
+ private:
+ TCP(int handle, sockaddr *addr, socklen_t addrlen);
+ void SetupOptions(void);
+
+ int handle;
+ socklen_t addrlen;
+ sockaddr *addr;
+};
diff --git a/modules/tcp/TCPServer.cc b/modules/tcp/TCPServer.cc
new file mode 100644
index 0000000..55f8511
--- /dev/null
+++ b/modules/tcp/TCPServer.cc
@@ -0,0 +1,132 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TCPServer.h"
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <cstdlib>
+#include <stdexcept>
+
+#include "aitt_internal.h"
+
+#define BACKLOG 10 // Accept only 10 simultaneously connections by default
+
+TCP::Server::Server(const std::string &host, unsigned short &port)
+ : handle(-1), addr(nullptr), addrlen(0)
+{
+ int ret = 0;
+
+ do {
+ handle = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
+ if (handle < 0)
+ break;
+
+ addrlen = sizeof(sockaddr_in);
+ addr = static_cast<sockaddr *>(calloc(1, sizeof(sockaddr_in)));
+ if (!addr)
+ break;
+
+ sockaddr_in *inet_addr = reinterpret_cast<sockaddr_in *>(addr);
+ if (!inet_pton(AF_INET, host.c_str(), &inet_addr->sin_addr)) {
+ ret = EINVAL;
+ break;
+ }
+
+ inet_addr->sin_port = htons(port);
+ inet_addr->sin_family = AF_INET;
+
+ int on = 1;
+ ret = setsockopt(handle, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (ret < 0)
+ break;
+
+ ret = bind(handle, addr, addrlen);
+ if (ret < 0)
+ break;
+
+ if (!port) {
+ if (getsockname(handle, addr, &addrlen) < 0)
+ break;
+ port = ntohs(inet_addr->sin_port);
+ }
+
+ ret = listen(handle, BACKLOG);
+ if (ret < 0)
+ break;
+
+ return;
+ } while (0);
+
+ if (ret <= 0)
+ ret = errno;
+
+ free(addr);
+
+ if (handle >= 0 && close(handle) < 0)
+ ERR_CODE(errno, "close");
+
+ throw std::runtime_error(strerror(ret));
+}
+
+TCP::Server::~Server(void)
+{
+ if (handle < 0)
+ return;
+
+ free(addr);
+ if (close(handle) < 0)
+ ERR_CODE(errno, "close");
+}
+
+std::unique_ptr<TCP> TCP::Server::AcceptPeer(void)
+{
+ sockaddr *peerAddr;
+ socklen_t szAddr = sizeof(sockaddr_in);
+ int peerHandle;
+
+ peerAddr = static_cast<sockaddr *>(calloc(1, szAddr));
+ if (!peerAddr)
+ throw std::runtime_error(strerror(errno));
+
+ peerHandle = accept(handle, peerAddr, &szAddr);
+ if (peerHandle < 0) {
+ free(peerAddr);
+ throw std::runtime_error(strerror(errno));
+ }
+
+ return std::unique_ptr<TCP>(new TCP(peerHandle, peerAddr, szAddr));
+}
+
+int TCP::Server::GetHandle(void)
+{
+ return handle;
+}
+
+unsigned short TCP::Server::GetPort(void)
+{
+ sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ if (getsockname(handle, reinterpret_cast<sockaddr *>(&addr), &addrlen) < 0)
+ throw std::runtime_error(strerror(errno));
+
+ return ntohs(addr.sin_port);
+}
diff --git a/modules/tcp/TCPServer.h b/modules/tcp/TCPServer.h
new file mode 100644
index 0000000..3c82bc6
--- /dev/null
+++ b/modules/tcp/TCPServer.h
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "TCP.h"
+
+class TCP::Server {
+ public:
+ Server(const std::string &host, unsigned short &port);
+ virtual ~Server(void);
+
+ std::unique_ptr<TCP> AcceptPeer(void);
+
+ int GetHandle(void);
+ unsigned short GetPort(void);
+
+ private:
+ int handle;
+ sockaddr *addr;
+ socklen_t addrlen;
+};
diff --git a/modules/tcp/samples/CMakeLists.txt b/modules/tcp/samples/CMakeLists.txt
new file mode 100644
index 0000000..8fd1b4b
--- /dev/null
+++ b/modules/tcp/samples/CMakeLists.txt
@@ -0,0 +1,3 @@
+ADD_EXECUTABLE("aitt_tcp_test" tcp_test.cc $<TARGET_OBJECTS:TCP_OBJ>)
+TARGET_LINK_LIBRARIES("aitt_tcp_test" ${PROJECT_NAME} Threads::Threads ${AITT_NEEDS_LIBRARIES})
+INSTALL(TARGETS "aitt_tcp_test" DESTINATION ${AITT_TEST_BINDIR})
diff --git a/modules/tcp/samples/tcp_test.cc b/modules/tcp/samples/tcp_test.cc
new file mode 100644
index 0000000..d319e27
--- /dev/null
+++ b/modules/tcp/samples/tcp_test.cc
@@ -0,0 +1,235 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <TCP.h>
+#include <TCPServer.h>
+#include <getopt.h>
+#include <glib.h>
+
+#include <functional>
+#include <iostream>
+#include <memory>
+#include <string>
+
+//#define _LOG_WITH_TIMESTAMP
+#include "aitt_internal.h"
+#ifdef _LOG_WITH_TIMESTAMP
+__thread __aitt__tls__ __aitt;
+#endif
+
+#define HELLO_STRING "hello"
+#define BYE_STRING "bye"
+#define SEND_INTERVAL 1000
+
+class AittTcpSample {
+ public:
+ AittTcpSample(const std::string &host, unsigned short &port)
+ : server(std::make_unique<TCP::Server>(host, port))
+ {
+ }
+ virtual ~AittTcpSample(void) {}
+
+ std::unique_ptr<TCP::Server> server;
+};
+
+int main(int argc, char *argv[])
+{
+ const option opts[] = {
+ {
+ .name = "server",
+ .has_arg = 0,
+ .flag = nullptr,
+ .val = 's',
+ },
+ {
+ .name = "host",
+ .has_arg = 1,
+ .flag = nullptr,
+ .val = 'h',
+ },
+ {
+ .name = "port",
+ .has_arg = 1,
+ .flag = nullptr,
+ .val = 'p',
+ },
+ };
+ int c;
+ int idx;
+ bool isServer = false;
+ std::string host = "127.0.0.1";
+ unsigned short port = 0;
+
+ while ((c = getopt_long(argc, argv, "sh:up:", opts, &idx)) != -1) {
+ switch (c) {
+ case 's':
+ isServer = true;
+ break;
+ case 'h':
+ host = optarg;
+ break;
+ case 'p':
+ port = std::stoi(optarg);
+ break;
+ default:
+ break;
+ }
+ }
+
+ INFO("Host[%s] port[%u]", host.c_str(), port);
+
+ struct EventData {
+ GSource source_;
+ GPollFD fd;
+ AittTcpSample *sample;
+ };
+
+ guint timeoutId = 0;
+ GSource *src = nullptr;
+ EventData *ed = nullptr;
+
+ GMainLoop *mainLoop = g_main_loop_new(nullptr, FALSE);
+ if (!mainLoop) {
+ ERR("Failed to create a main loop");
+ return 1;
+ }
+
+ // Handling the server/client events
+ if (isServer) {
+ GSourceFuncs srcs = {
+ [](GSource *src, gint *timeout) -> gboolean {
+ *timeout = 1;
+ return FALSE;
+ },
+ [](GSource *src) -> gboolean {
+ EventData *ed = reinterpret_cast<EventData *>(src);
+ RETV_IF(ed == nullptr, FALSE);
+
+ if ((ed->fd.revents & G_IO_IN) == G_IO_IN)
+ return TRUE;
+ if ((ed->fd.revents & G_IO_ERR) == G_IO_ERR)
+ return TRUE;
+
+ return FALSE;
+ },
+ [](GSource *src, GSourceFunc callback, gpointer user_data) -> gboolean {
+ EventData *ed = reinterpret_cast<EventData *>(src);
+ RETV_IF(ed == nullptr, FALSE);
+
+ if ((ed->fd.revents & G_IO_ERR) == G_IO_ERR) {
+ ERR("Error!");
+ return FALSE;
+ }
+
+ std::unique_ptr<TCP> peer = ed->sample->server->AcceptPeer();
+
+ INFO("Assigned port: %u, %u", ed->sample->server->GetPort(), peer->GetPort());
+ std::string peerHost;
+ unsigned short peerPort = 0;
+ peer->GetPeerInfo(peerHost, peerPort);
+ INFO("Peer Info: %s %u", peerHost.c_str(), peerPort);
+
+ char buffer[10];
+ void *ptr = static_cast<void *>(buffer);
+ size_t szData = sizeof(HELLO_STRING);
+ peer->Recv(ptr, szData);
+ INFO("Gots[%s]", buffer);
+
+ szData = sizeof(BYE_STRING);
+ peer->Send(BYE_STRING, szData);
+ INFO("Reply to client[%s]", BYE_STRING);
+
+ return TRUE;
+ },
+ nullptr,
+ };
+
+ src = g_source_new(&srcs, sizeof(EventData));
+ if (!src) {
+ g_main_loop_unref(mainLoop);
+ ERR("g_source_new failed");
+ return 1;
+ }
+
+ ed = reinterpret_cast<EventData *>(src);
+
+ try {
+ ed->sample = new AittTcpSample(host, port);
+ } catch (std::exception &e) {
+ ERR("new: %s", e.what());
+ g_source_unref(src);
+ g_main_loop_unref(mainLoop);
+ return 1;
+ }
+
+ INFO("host: %s, port: %u", host.c_str(), port);
+
+ ed->fd.fd = ed->sample->server->GetHandle();
+ ed->fd.events = G_IO_IN | G_IO_ERR;
+ g_source_add_poll(src, &ed->fd);
+ guint id = g_source_attach(src, g_main_loop_get_context(mainLoop));
+ g_source_unref(src);
+ if (id == 0) {
+ delete ed->sample;
+ g_source_destroy(src);
+ g_main_loop_unref(mainLoop);
+ return 1;
+ }
+ } else {
+ static struct Main {
+ const std::string &host;
+ unsigned short port;
+ } main_data = {
+ .host = host,
+ .port = port,
+ };
+ // Now the server is ready.
+ // Let's create a new client and communicate with the server within every
+ // SEND_INTERTVAL
+ timeoutId = g_timeout_add(
+ SEND_INTERVAL,
+ [](gpointer data) -> gboolean {
+ Main *ctx = static_cast<Main *>(data);
+ std::unique_ptr<TCP> client(std::make_unique<TCP>(ctx->host, ctx->port));
+
+ INFO("Assigned client port: %u", client->GetPort());
+
+ INFO("Send[%s]", HELLO_STRING);
+ size_t szBuffer = sizeof(HELLO_STRING);
+ client->Send(HELLO_STRING, szBuffer);
+
+ char buffer[10];
+ void *ptr = static_cast<void *>(buffer);
+ szBuffer = sizeof(BYE_STRING);
+ client->Recv(ptr, szBuffer);
+ INFO("Replied with[%s]", buffer);
+
+ // Send oneshot message, and disconnect from the server
+ return TRUE;
+ },
+ &main_data);
+ }
+
+ g_main_loop_run(mainLoop);
+
+ if (src) {
+ delete ed->sample;
+ g_source_destroy(src);
+ }
+ if (timeoutId)
+ g_source_remove(timeoutId);
+ g_main_loop_unref(mainLoop);
+ return 0;
+}
diff --git a/modules/tcp/tests/CMakeLists.txt b/modules/tcp/tests/CMakeLists.txt
new file mode 100644
index 0000000..bf1adf1
--- /dev/null
+++ b/modules/tcp/tests/CMakeLists.txt
@@ -0,0 +1,19 @@
+PKG_CHECK_MODULES(UT_NEEDS REQUIRED gmock_main)
+INCLUDE_DIRECTORIES(${UT_NEEDS_INCLUDE_DIRS})
+LINK_DIRECTORIES(${UT_NEEDS_LIBRARY_DIRS})
+
+SET(AITT_TCP_UT ${PROJECT_NAME}_tcp_ut)
+
+SET(AITT_TCP_UT_SRC TCP_test.cc TCPServer_test.cc)
+
+ADD_EXECUTABLE(${AITT_TCP_UT} ${AITT_TCP_UT_SRC} $<TARGET_OBJECTS:TCP_OBJ>)
+TARGET_LINK_LIBRARIES(${AITT_TCP_UT} ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_NEEDS_LIBRARIES})
+INSTALL(TARGETS ${AITT_TCP_UT} DESTINATION ${AITT_TEST_BINDIR})
+
+ADD_TEST(
+ NAME
+ ${AITT_TCP_UT}
+ COMMAND
+ ${CMAKE_COMMAND} -E env
+ ${CMAKE_CURRENT_BINARY_DIR}/${AITT_TCP_UT} --gtest_filter=*_Anytime
+)
diff --git a/modules/tcp/tests/TCPServer_test.cc b/modules/tcp/tests/TCPServer_test.cc
new file mode 100644
index 0000000..e8b48b1
--- /dev/null
+++ b/modules/tcp/tests/TCPServer_test.cc
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../TCPServer.h"
+
+#include <gtest/gtest.h>
+
+#include <condition_variable>
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#define TEST_SERVER_ADDRESS "127.0.0.1"
+#define TEST_SERVER_INVALID_ADDRESS "287.0.0.1"
+#define TEST_SERVER_PORT 8123
+#define TEST_SERVER_AVAILABLE_PORT 0
+
+TEST(TCPServer, Positive_Create_Anytime)
+{
+ unsigned short port = TEST_SERVER_PORT;
+ std::unique_ptr<TCP::Server> tcp(std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, port));
+ ASSERT_NE(tcp, nullptr);
+}
+
+TEST(TCPServer, Negative_Create_Anytime)
+{
+ try {
+ unsigned short port = TEST_SERVER_PORT;
+
+ std::unique_ptr<TCP::Server> tcp(
+ std::make_unique<TCP::Server>(TEST_SERVER_INVALID_ADDRESS, port));
+ ASSERT_EQ(tcp, nullptr);
+ } catch (std::exception &e) {
+ ASSERT_STREQ(e.what(), strerror(EINVAL));
+ }
+}
+
+TEST(TCPServer, Positive_Create_AutoPort_Anytime)
+{
+ unsigned short port = TEST_SERVER_AVAILABLE_PORT;
+ std::unique_ptr<TCP::Server> tcp(std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, port));
+ ASSERT_NE(tcp, nullptr);
+ ASSERT_NE(port, 0);
+}
+
+TEST(TCPServer, Positive_GetPort_Anytime)
+{
+ unsigned short port = TEST_SERVER_PORT;
+ std::unique_ptr<TCP::Server> tcp(std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, port));
+ ASSERT_NE(tcp, nullptr);
+ ASSERT_EQ(tcp->GetPort(), TEST_SERVER_PORT);
+}
+
+TEST(TCPServer, Positive_GetHandle_Anytime)
+{
+ unsigned short port = TEST_SERVER_PORT;
+ std::unique_ptr<TCP::Server> tcp(std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, port));
+ ASSERT_NE(tcp, nullptr);
+ ASSERT_GE(tcp->GetHandle(), 0);
+}
+
+TEST(TCPServer, Positive_GetPort_AutoPort_Anytime)
+{
+ unsigned short port = TEST_SERVER_AVAILABLE_PORT;
+ std::unique_ptr<TCP::Server> tcp(std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, port));
+ ASSERT_NE(tcp, nullptr);
+ ASSERT_EQ(tcp->GetPort(), port);
+}
+
+TEST(TCPServer, Positive_AcceptPeer_Anytime)
+{
+ std::mutex m;
+ std::condition_variable ready_cv;
+ std::condition_variable connected_cv;
+ bool ready = false;
+ bool connected = false;
+
+ unsigned short serverPort = TEST_SERVER_PORT;
+ std::thread serverThread(
+ [serverPort, &m, &ready, &connected, &ready_cv, &connected_cv](void) mutable -> void {
+ std::unique_ptr<TCP::Server> tcp(
+ std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, serverPort));
+ {
+ std::lock_guard<std::mutex> lk(m);
+ ready = true;
+ }
+ ready_cv.notify_one();
+
+ std::unique_ptr<TCP> peer = tcp->AcceptPeer();
+ {
+ std::lock_guard<std::mutex> lk(m);
+ connected = !!peer;
+ }
+ connected_cv.notify_one();
+ });
+
+ {
+ std::unique_lock<std::mutex> lk(m);
+ ready_cv.wait(lk, [&ready] { return ready; });
+ std::unique_ptr<TCP> tcp(std::make_unique<TCP>(TEST_SERVER_ADDRESS, serverPort));
+ connected_cv.wait(lk, [&connected] { return connected; });
+ }
+
+ serverThread.join();
+
+ ASSERT_EQ(ready, true);
+ ASSERT_EQ(connected, true);
+}
diff --git a/modules/tcp/tests/TCP_test.cc b/modules/tcp/tests/TCP_test.cc
new file mode 100644
index 0000000..604bd23
--- /dev/null
+++ b/modules/tcp/tests/TCP_test.cc
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <condition_variable>
+#include <cstring>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "TCPServer.h"
+
+#define TEST_SERVER_ADDRESS "127.0.0.1"
+#define TEST_SERVER_INVALID_ADDRESS "287.0.0.1"
+#define TEST_SERVER_PORT 8123
+#define TEST_SERVER_AVAILABLE_PORT 0
+#define TEST_BUFFER_SIZE 256
+#define TEST_BUFFER_HELLO "Hello World"
+#define TEST_BUFFER_BYE "Good Bye"
+
+class TCPTest : public testing::Test {
+ protected:
+ void SetUp() override
+ {
+ ready = false;
+ serverPort = TEST_SERVER_PORT;
+ customTest = [](void) {};
+
+ clientThread = std::thread([this](void) mutable -> void {
+ std::unique_lock<std::mutex> lk(m);
+ ready_cv.wait(lk, [this] { return ready; });
+ client = std::make_unique<TCP>(TEST_SERVER_ADDRESS, serverPort);
+
+ customTest();
+ });
+ }
+
+ void RunServer(void)
+ {
+ tcp = std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, serverPort);
+ {
+ std::lock_guard<std::mutex> lk(m);
+ ready = true;
+ }
+ ready_cv.notify_one();
+
+ peer = tcp->AcceptPeer();
+ }
+
+ void TearDown() override { clientThread.join(); }
+
+ protected:
+ std::mutex m;
+ std::condition_variable ready_cv;
+ bool ready;
+ unsigned short serverPort;
+ std::thread clientThread;
+ std::unique_ptr<TCP::Server> tcp;
+ std::unique_ptr<TCP> peer;
+ std::unique_ptr<TCP> client;
+ std::function<void(void)> customTest;
+};
+
+TEST(TCP, Negative_Create_InvalidPort_Anytime)
+{
+ try {
+ std::unique_ptr<TCP> tcp(
+ std::make_unique<TCP>(TEST_SERVER_ADDRESS, TEST_SERVER_AVAILABLE_PORT));
+ ASSERT_EQ(tcp, nullptr);
+ } catch (std::exception &e) {
+ ASSERT_STREQ(e.what(), strerror(EINVAL));
+ }
+}
+
+TEST(TCP, Negative_Create_InvalidAddress_Anytime)
+{
+ try {
+ std::unique_ptr<TCP> tcp(
+ std::make_unique<TCP>(TEST_SERVER_INVALID_ADDRESS, TEST_SERVER_PORT));
+ ASSERT_EQ(tcp, nullptr);
+ } catch (std::exception &e) {
+ ASSERT_STREQ(e.what(), strerror(EINVAL));
+ }
+}
+
+TEST_F(TCPTest, Positive_GetPeerInfo_Anytime)
+{
+ std::string peerHost;
+ unsigned short peerPort = 0;
+
+ RunServer();
+
+ peer->GetPeerInfo(peerHost, peerPort);
+ ASSERT_STREQ(peerHost.c_str(), TEST_SERVER_ADDRESS);
+ ASSERT_GT(peerPort, 0);
+}
+
+TEST_F(TCPTest, Positive_GetHandle_Anytime)
+{
+ RunServer();
+ int handle = peer->GetHandle();
+ ASSERT_GE(handle, 0);
+}
+
+TEST_F(TCPTest, Positive_GetPort_Anytime)
+{
+ RunServer();
+ unsigned short port = peer->GetPort();
+ ASSERT_GT(port, 0);
+}
+
+TEST_F(TCPTest, Positive_SendRecv_Anytime)
+{
+ char helloBuffer[TEST_BUFFER_SIZE];
+ char byeBuffer[TEST_BUFFER_SIZE];
+
+ customTest = [this, &helloBuffer](void) mutable -> void {
+ size_t szData = sizeof(helloBuffer);
+ client->Recv(static_cast<void *>(helloBuffer), szData);
+
+ szData = sizeof(TEST_BUFFER_BYE);
+ client->Send(TEST_BUFFER_BYE, szData);
+ };
+
+ RunServer();
+
+ size_t szMsg = sizeof(TEST_BUFFER_HELLO);
+ peer->Send(TEST_BUFFER_HELLO, szMsg);
+
+ szMsg = sizeof(byeBuffer);
+ peer->Recv(static_cast<void *>(byeBuffer), szMsg);
+
+ ASSERT_STREQ(helloBuffer, TEST_BUFFER_HELLO);
+ ASSERT_STREQ(byeBuffer, TEST_BUFFER_BYE);
+}