diff options
Diffstat (limited to 'modules/tcp')
-rw-r--r-- | modules/tcp/CMakeLists.txt | 14 | ||||
-rw-r--r-- | modules/tcp/Module.cc | 513 | ||||
-rw-r--r-- | modules/tcp/Module.h | 135 | ||||
-rw-r--r-- | modules/tcp/TCP.cc | 157 | ||||
-rw-r--r-- | modules/tcp/TCP.h | 43 | ||||
-rw-r--r-- | modules/tcp/TCPServer.cc | 132 | ||||
-rw-r--r-- | modules/tcp/TCPServer.h | 37 | ||||
-rw-r--r-- | modules/tcp/samples/CMakeLists.txt | 3 | ||||
-rw-r--r-- | modules/tcp/samples/tcp_test.cc | 235 | ||||
-rw-r--r-- | modules/tcp/tests/CMakeLists.txt | 19 | ||||
-rw-r--r-- | modules/tcp/tests/TCPServer_test.cc | 121 | ||||
-rw-r--r-- | modules/tcp/tests/TCP_test.cc | 149 |
12 files changed, 1558 insertions, 0 deletions
diff --git a/modules/tcp/CMakeLists.txt b/modules/tcp/CMakeLists.txt new file mode 100644 index 0000000..edac2fd --- /dev/null +++ b/modules/tcp/CMakeLists.txt @@ -0,0 +1,14 @@ +SET(AITT_TCP aitt-transport-tcp) + +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}) + +ADD_LIBRARY(TCP_OBJ OBJECT TCP.cc TCPServer.cc) +ADD_LIBRARY(${AITT_TCP} SHARED ../main.cc Module.cc $<TARGET_OBJECTS:TCP_OBJ>) +TARGET_LINK_LIBRARIES(${AITT_TCP} ${AITT_TCP_NEEDS_LIBRARIES} Threads::Threads ${AITT_COMMON}) + +INSTALL(TARGETS ${AITT_TCP} DESTINATION ${CMAKE_INSTALL_LIBDIR}) + +IF(BUILD_TESTING) + ADD_SUBDIRECTORY(samples) + ADD_SUBDIRECTORY(tests) +ENDIF(BUILD_TESTING) diff --git a/modules/tcp/Module.cc b/modules/tcp/Module.cc new file mode 100644 index 0000000..bc50d7d --- /dev/null +++ b/modules/tcp/Module.cc @@ -0,0 +1,513 @@ +/* + * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Module.h" + +#include <MQ.h> +#include <flatbuffers/flexbuffers.h> +#include <unistd.h> + +#include "aitt_internal.h" + +/* + * P2P Data Packet Definition + * TopicLength: 4 bytes + * TopicString: $TopicLength + */ + +Module::Module(const std::string &ip, AittDiscovery &discovery) : AittTransport(discovery), ip(ip) +{ + aittThread = std::thread(&Module::ThreadMain, this); + + discovery_cb = discovery.AddDiscoveryCB(AITT_TYPE_TCP, + std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); + DBG("Discovery Callback : %p, %d", this, discovery_cb); +} + +Module::~Module(void) +{ + discovery.RemoveDiscoveryCB(discovery_cb); + + while (main_loop.Quit() == false) { + // wait when called before the thread has completely created. + usleep(1000); + } + + if (aittThread.joinable()) + aittThread.join(); +} + +void Module::ThreadMain(void) +{ + pthread_setname_np(pthread_self(), "TCPWorkerLoop"); + main_loop.Run(); +} + +void Module::Publish(const std::string &topic, const void *data, const size_t datalen, + const std::string &correlation, AittQoS qos, bool retain) +{ + // NOTE: + // Iterate discovered service table + // PublishMap + // map { + // "/customTopic/faceRecog": map { + // "$clientId": map { + // 11234: $handle, + // + // ... + // + // 21234: nullptr, + // }, + // }, + // } + std::lock_guard<std::mutex> auto_lock_publish(publishTableLock); + for (PublishMap::iterator it = publishTable.begin(); it != publishTable.end(); ++it) { + // NOTE: + // Find entries that have matched with the given topic + if (!aitt::MQ::CompareTopic(it->first, topic)) + continue; + + // NOTE: + // Iterate all hosts + for (HostMap::iterator hostIt = it->second.begin(); hostIt != it->second.end(); ++hostIt) { + // Iterate all ports, + // the current implementation only be able to have the ZERO or a SINGLE entry + // hostIt->first // clientId + for (PortMap::iterator portIt = hostIt->second.begin(); portIt != hostIt->second.end(); + ++portIt) { + // portIt->first // port + // portIt->second // handle + if (!portIt->second) { + std::string host; + { + ClientMap::iterator clientIt; + std::lock_guard<std::mutex> auto_lock_client(clientTableLock); + + clientIt = clientTable.find(hostIt->first); + if (clientIt != clientTable.end()) + host = clientIt->second; + + // NOTE: + // otherwise, it is a critical error + // The broken clientTable or subscribeTable + } + + std::unique_ptr<TCP> client(std::make_unique<TCP>(host, portIt->first)); + + // TODO: + // If the client gets disconnected, + // This channel entry must be cleared + // In order to do that, + // There should be an observer to monitor + // each connections and manipulate + // the discovered service table + portIt->second = std::move(client); + } + + if (!portIt->second) { + ERR("Failed to create a new client instance"); + continue; + } + + SendTopic(topic, portIt); + SendPayload(datalen, portIt, data); + } + } // connectionEntries + } // publishTable +} + +void Module::SendTopic(const std::string &topic, Module::PortMap::iterator &portIt) +{ + uint32_t topicLen = topic.length(); + size_t szData = sizeof(topicLen); + portIt->second->Send(static_cast<void *>(&topicLen), szData); + szData = topicLen; + portIt->second->Send(static_cast<const void *>(topic.c_str()), szData); +} + +void Module::SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data) +{ + uint32_t sendsize = datalen; + size_t szsize = sizeof(sendsize); + + try { + if (0 == datalen) { + // distinguish between connection problems and zero-size messages + INFO("Send zero-size Message"); + sendsize = UINT32_MAX; + } + portIt->second->Send(static_cast<void *>(&sendsize), szsize); + + int msgSize = datalen; + while (0 < msgSize) { + size_t sentSize = msgSize; + char *dataIdx = (char *)data + (sendsize - msgSize); + portIt->second->Send(dataIdx, sentSize); + if (sentSize > 0) { + msgSize -= sentSize; + } + } + } catch (std::exception &e) { + ERR("An exception(%s) occurs during Send().", e.what()); + } +} + +void Module::Publish(const std::string &topic, const void *data, const size_t datalen, AittQoS qos, + bool retain) +{ + Publish(topic, data, datalen, std::string(), qos, retain); +} + +void *Module::Subscribe(const std::string &topic, const AittTransport::SubscribeCallback &cb, + void *cbdata, AittQoS qos) +{ + std::unique_ptr<TCP::Server> tcpServer; + + unsigned short port = 0; + tcpServer = std::make_unique<TCP::Server>("0.0.0.0", port); + TCPServerData *listen_info = new TCPServerData; + listen_info->impl = this; + listen_info->cb = cb; + listen_info->cbdata = cbdata; + listen_info->topic = topic; + auto handle = tcpServer->GetHandle(); + + main_loop.AddWatch(handle, AcceptConnection, listen_info); + + // 서비스 테이블에 토픽을 키워드로 프로토콜을 등록한다. + { + std::lock_guard<std::mutex> autoLock(subscribeTableLock); + subscribeTable.insert(SubscribeMap::value_type(topic, std::move(tcpServer))); + UpdateDiscoveryMsg(); + } + + return reinterpret_cast<void *>(handle); +} + +void *Module::Subscribe(const std::string &topic, const AittTransport::SubscribeCallback &cb, + const void *data, const size_t datalen, void *cbdata, AittQoS qos) +{ + return nullptr; +} + +void *Module::Unsubscribe(void *handlePtr) +{ + int handle = static_cast<int>(reinterpret_cast<intptr_t>(handlePtr)); + TCPServerData *listen_info = dynamic_cast<TCPServerData *>(main_loop.RemoveWatch(handle)); + if (!listen_info) + return nullptr; + + { + std::lock_guard<std::mutex> autoLock(subscribeTableLock); + auto it = subscribeTable.find(listen_info->topic); + if (it == subscribeTable.end()) + throw std::runtime_error("Service is not registered: " + listen_info->topic); + + subscribeTable.erase(it); + + UpdateDiscoveryMsg(); + } + + void *cbdata = listen_info->cbdata; + listen_info->client_lock.lock(); + for (auto fd : listen_info->client_list) { + TCPData *connect_info = dynamic_cast<TCPData *>(main_loop.RemoveWatch(fd)); + delete connect_info; + } + listen_info->client_list.clear(); + listen_info->client_lock.unlock(); + delete listen_info; + + return cbdata; +} + +void Module::DiscoveryMessageCallback(const std::string &clientId, const std::string &status, + const void *msg, const int szmsg) +{ + // NOTE: + // Iterate discovered service table + // PublishMap + // map { + // "/customTopic/faceRecog": map { + // "clientId.uniq.abcd.123": map { + // 11234: pair { + // "protocol": 1, + // "handle": nullptr, + // }, + // + // ... + // + // 21234: pair { + // "protocol": 2, + // "handle": nullptr, + // } + // }, + // }, + // } + + if (!status.compare(AittDiscovery::WILL_LEAVE_NETWORK)) { + { + std::lock_guard<std::mutex> autoLock(clientTableLock); + // Delete from the { clientId : Host } mapping table + clientTable.erase(clientId); + } + + { + // NOTE: + // Iterate all topics in the publishTable holds discovered client information + std::lock_guard<std::mutex> autoLock(publishTableLock); + for (auto it = publishTable.begin(); it != publishTable.end(); ++it) + it->second.erase(clientId); + } + return; + } + + // serviceMessage (flexbuffers) + // map { + // "host": "192.168.1.11", + // "$topic": port, + // } + auto map = flexbuffers::GetRoot(static_cast<const uint8_t *>(msg), szmsg).AsMap(); + std::string host = map["host"].AsString().c_str(); + + // NOTE: + // Update the clientTable + { + std::lock_guard<std::mutex> autoLock(clientTableLock); + auto clientIt = clientTable.find(clientId); + if (clientIt == clientTable.end()) + clientTable.insert(ClientMap::value_type(clientId, host)); + else if (clientIt->second.compare(host)) + clientIt->second = host; + } + + auto topics = map.Keys(); + for (size_t idx = 0; idx < topics.size(); ++idx) { + std::string topic = topics[idx].AsString().c_str(); + + if (!topic.compare("host")) + continue; + + auto port = map[topic].AsUInt16(); + + { + std::lock_guard<std::mutex> autoLock(publishTableLock); + UpdatePublishTable(topic, clientId, port); + } + } +} + +void Module::UpdateDiscoveryMsg() +{ + flexbuffers::Builder fbb; + // flexbuffers + // { + // "host": "127.0.0.1", + // "/customTopic/aitt/faceRecog": $port, + // "/customTopic/aitt/ASR": 102020, + // + // ... + // + // "/customTopic/aitt/+": 20123, + // } + fbb.Map([this, &fbb]() { + fbb.String("host", ip); + + // SubscribeTable + // map { + // "/customTopic/mytopic": $serverHandle, + // ... + // } + for (auto it = subscribeTable.begin(); it != subscribeTable.end(); ++it) { + if (it->second) + fbb.UInt(it->first.c_str(), it->second->GetPort()); + else + fbb.UInt(it->first.c_str(), 0); // this is an error case + } + }); + fbb.Finish(); + + auto buf = fbb.GetBuffer(); + discovery.UpdateDiscoveryMsg(AITT_TYPE_TCP, buf.data(), buf.size()); +} + +void Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle, + MainLoopHandler::MainLoopData *user_data) +{ + TCPData *connect_info = dynamic_cast<TCPData *>(user_data); + RET_IF(connect_info == nullptr); + TCPServerData *parent_info = connect_info->parent; + RET_IF(parent_info == nullptr); + Module *impl = parent_info->impl; + RET_IF(impl == nullptr); + + if (result == MainLoopHandler::HANGUP) { + ERR("Disconnected"); + return impl->HandleClientDisconnect(handle); + } + + uint32_t szmsg = 0; + size_t szdata = sizeof(szmsg); + char *msg = nullptr; + std::string topic; + + try { + topic = impl->GetTopicName(connect_info); + if (topic.empty()) { + ERR("Unknown Topic"); + return impl->HandleClientDisconnect(handle); + } + + connect_info->client->Recv(static_cast<void *>(&szmsg), szdata); + if (szmsg == 0) { + ERR("Disconnected"); + return impl->HandleClientDisconnect(handle); + } + + if (UINT32_MAX == szmsg) { + // distinguish between connection problems and zero-size messages + INFO("Got zero-size Message"); + szmsg = 0; + } + + msg = static_cast<char *>(malloc(szmsg)); + int msgSize = szmsg; + while (0 < msgSize) { + size_t receivedSize = msgSize; + connect_info->client->Recv(static_cast<void *>(msg + (szmsg - msgSize)), receivedSize); + if (receivedSize > 0) { + msgSize -= receivedSize; + } + } + } catch (std::exception &e) { + ERR("An exception(%s) occurs during Recv()", e.what()); + } + + std::string correlation; + // TODO: + // Correlation data (string) should be filled + + parent_info->cb(topic, msg, szmsg, parent_info->cbdata, correlation); + free(msg); +} + +void Module::HandleClientDisconnect(int handle) +{ + TCPData *connect_info = dynamic_cast<TCPData *>(main_loop.RemoveWatch(handle)); + if (connect_info == nullptr) { + ERR("No watch data"); + return; + } + connect_info->parent->client_lock.lock(); + auto it = std::find(connect_info->parent->client_list.begin(), + connect_info->parent->client_list.end(), handle); + connect_info->parent->client_list.erase(it); + connect_info->parent->client_lock.unlock(); + + delete connect_info; +} + +std::string Module::GetTopicName(Module::TCPData *connect_info) +{ + uint32_t topic_len = 0; + size_t data_size = sizeof(topic_len); + connect_info->client->Recv(static_cast<void *>(&topic_len), data_size); + + if (AITT_TOPIC_NAME_MAX < topic_len) { + ERR("Invalid topic name length(%d)", topic_len); + return std::string(); + } + + char data[topic_len]; + data_size = topic_len; + connect_info->client->Recv(data, data_size); + if (data_size != topic_len) + ERR("Recv() Fail"); + + return std::string(data, data_size); +} + +void Module::AcceptConnection(MainLoopHandler::MainLoopResult result, int handle, + MainLoopHandler::MainLoopData *user_data) +{ + // TODO: + // Update the discovery map + std::unique_ptr<TCP> client; + + TCPServerData *listen_info = dynamic_cast<TCPServerData *>(user_data); + Module *impl = listen_info->impl; + { + std::lock_guard<std::mutex> autoLock(impl->subscribeTableLock); + + auto clientIt = impl->subscribeTable.find(listen_info->topic); + if (clientIt == impl->subscribeTable.end()) + return; + + client = clientIt->second->AcceptPeer(); + } + + if (client == nullptr) { + ERR("Unable to accept a peer"); // NOTE: FATAL ERROR + return; + } + + int cHandle = client->GetHandle(); + listen_info->client_list.push_back(cHandle); + + TCPData *ecd = new TCPData; + ecd->parent = listen_info; + ecd->client = std::move(client); + + impl->main_loop.AddWatch(cHandle, ReceiveData, ecd); +} + +void Module::UpdatePublishTable(const std::string &topic, const std::string &clientId, + unsigned short port) +{ + auto topicIt = publishTable.find(topic); + if (topicIt == publishTable.end()) { + PortMap portMap; + portMap.insert(PortMap::value_type(port, nullptr)); + HostMap hostMap; + hostMap.insert(HostMap::value_type(clientId, std::move(portMap))); + publishTable.insert(PublishMap::value_type(topic, std::move(hostMap))); + return; + } + + auto hostIt = topicIt->second.find(clientId); + if (hostIt == topicIt->second.end()) { + PortMap portMap; + portMap.insert(PortMap::value_type(port, nullptr)); + topicIt->second.insert(HostMap::value_type(clientId, std::move(portMap))); + return; + } + + // NOTE: + // The current implementation only has a single port entry + // therefore, if the hostIt is not empty, there is the previous connection + if (!hostIt->second.empty()) { + auto portIt = hostIt->second.begin(); + + if (portIt->first == port) + return; // nothing changed. keep the current handle + + // otherwise, delete the connection handle + // to make a new connection with the new port + hostIt->second.clear(); + } + + hostIt->second.insert(PortMap::value_type(port, nullptr)); +} diff --git a/modules/tcp/Module.h b/modules/tcp/Module.h new file mode 100644 index 0000000..4011980 --- /dev/null +++ b/modules/tcp/Module.h @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <AittTransport.h> +#include <MainLoopHandler.h> + +#include <map> +#include <memory> +#include <mutex> +#include <string> +#include <thread> + +#include "TCPServer.h" + +using AittTransport = aitt::AittTransport; +using MainLoopHandler = aitt::MainLoopHandler; +using AittDiscovery = aitt::AittDiscovery; + +class Module : public AittTransport { + public: + explicit Module(const std::string &ip, AittDiscovery &discovery); + virtual ~Module(void); + + void Publish(const std::string &topic, const void *data, const size_t datalen, + const std::string &correlation, AittQoS qos = AITT_QOS_AT_MOST_ONCE, + bool retain = false) override; + + void Publish(const std::string &topic, const void *data, const size_t datalen, + AittQoS qos = AITT_QOS_AT_MOST_ONCE, bool retain = false) override; + + void *Subscribe(const std::string &topic, const SubscribeCallback &cb, void *cbdata = nullptr, + AittQoS qos = AITT_QOS_AT_MOST_ONCE) override; + + void *Subscribe(const std::string &topic, const SubscribeCallback &cb, const void *data, + const size_t datalen, void *cbdata = nullptr, + AittQoS qos = AITT_QOS_AT_MOST_ONCE) override; + void *Unsubscribe(void *handle) override; + + private: + struct TCPServerData : public MainLoopHandler::MainLoopData { + Module *impl; + SubscribeCallback cb; + void *cbdata; + std::string topic; + std::vector<int> client_list; + std::mutex client_lock; + }; + + struct TCPData : public MainLoopHandler::MainLoopData { + TCPServerData *parent; + std::unique_ptr<TCP> client; + }; + + // SubscribeTable + // map { + // "/customTopic/mytopic": $serverHandle, + // ... + // } + using SubscribeMap = std::map<std::string, std::unique_ptr<TCP::Server>>; + + // ClientTable + // map { + // $clientId: $host, + // "client.uniqId.123": "192.168.1.11" + // ... + // } + using ClientMap = std::map<std::string /* id */, std::string /* host */>; + + // NOTE: + // There could be multiple clientIds for the single host + // If several applications are run on the same device, each applicaion will get unique client + // Ids therefore we have to keep in mind that the clientId is not 1:1 matched for the IPAddress. + + // PublishTable + // map { + // "/customTopic/faceRecog": map { + // $clientId: map { + // 11234: $clientHandle, + // + // ... + // + // 21234: $clientHandle, + // }, + // }, + // } + // + // NOTE: + // TCP handle should be the unique_ptr, so if we delete the entry from the map, + // the handle must be released automatically + // in order to make the handle "unique_ptr", it should be a class object not the "void *" + using PortMap = std::map<unsigned short /* port */, std::unique_ptr<TCP>>; + using HostMap = std::map<std::string /* clientId */, PortMap>; + using PublishMap = std::map<std::string /* topic */, HostMap>; + + static void AcceptConnection(MainLoopHandler::MainLoopResult result, int handle, + MainLoopHandler::MainLoopData *watchData); + void DiscoveryMessageCallback(const std::string &clientId, const std::string &status, + const void *msg, const int szmsg); + void UpdateDiscoveryMsg(); + static void ReceiveData(MainLoopHandler::MainLoopResult result, int handle, + MainLoopHandler::MainLoopData *watchData); + void HandleClientDisconnect(int handle); + std::string GetTopicName(TCPData *connect_info); + void ThreadMain(void); + void SendPayload(const size_t &datalen, Module::PortMap::iterator &portIt, const void *data); + void SendTopic(const std::string &topic, Module::PortMap::iterator &portIt); + + void UpdatePublishTable(const std::string &topic, const std::string &host, unsigned short port); + + MainLoopHandler main_loop; + std::thread aittThread; + std::string ip; + int discovery_cb; + + PublishMap publishTable; + std::mutex publishTableLock; + SubscribeMap subscribeTable; + std::mutex subscribeTableLock; + ClientMap clientTable; + std::mutex clientTableLock; +}; diff --git a/modules/tcp/TCP.cc b/modules/tcp/TCP.cc new file mode 100644 index 0000000..3b6751e --- /dev/null +++ b/modules/tcp/TCP.cc @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "TCP.h" + +#include <arpa/inet.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#include <cstdlib> +#include <cstring> +#include <stdexcept> + +#include "aitt_internal.h" + +TCP::TCP(const std::string &host, unsigned short port) : handle(-1), addrlen(0), addr(nullptr) +{ + int ret = 0; + + do { + if (port == 0) { + ret = EINVAL; + break; + } + + handle = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (handle < 0) { + ERR("socket() Fail()"); + break; + } + + addrlen = sizeof(sockaddr_in); + addr = static_cast<sockaddr *>(calloc(1, addrlen)); + if (!addr) { + ERR("calloc() Fail()"); + break; + } + + sockaddr_in *inet_addr = reinterpret_cast<sockaddr_in *>(addr); + if (!inet_pton(AF_INET, host.c_str(), &inet_addr->sin_addr)) { + ret = EINVAL; + break; + } + + inet_addr->sin_port = htons(port); + inet_addr->sin_family = AF_INET; + + ret = connect(handle, addr, addrlen); + if (ret < 0) { + ERR("connect() Fail(%s, %d)", host.c_str(), port); + break; + } + + SetupOptions(); + return; + } while (0); + + if (ret <= 0) + ret = errno; + + free(addr); + if (handle >= 0 && close(handle) < 0) + ERR_CODE(errno, "close"); + throw std::runtime_error(strerror(ret)); +} + +TCP::TCP(int handle, sockaddr *addr, socklen_t szAddr) : handle(handle), addrlen(szAddr), addr(addr) +{ + SetupOptions(); +} + +TCP::~TCP(void) +{ + if (handle < 0) + return; + + free(addr); + if (close(handle) < 0) + ERR_CODE(errno, "close"); +} + +void TCP::SetupOptions(void) +{ + int on = 1; + + int ret = setsockopt(handle, IPPROTO_IP, TCP_NODELAY, &on, sizeof(on)); + if (ret < 0) { + ERR_CODE(errno, "delay option setting failed"); + } +} + +void TCP::Send(const void *data, size_t &szData) +{ + int ret = send(handle, data, szData, 0); + if (ret < 0) { + ERR("Fail to send data, handle = %d, size = %zu", handle, szData); + throw std::runtime_error(strerror(errno)); + } + + szData = ret; +} + +void TCP::Recv(void *data, size_t &szData) +{ + int ret = recv(handle, data, szData, 0); + if (ret < 0) { + ERR("Fail to recv data, handle = %d, size = %zu", handle, szData); + throw std::runtime_error(strerror(errno)); + } + + szData = ret; +} + +int TCP::GetHandle(void) +{ + return handle; +} + +void TCP::GetPeerInfo(std::string &host, unsigned short &port) +{ + char address[INET_ADDRSTRLEN] = { + 0, + }; + + if (!inet_ntop(AF_INET, &reinterpret_cast<sockaddr_in *>(this->addr)->sin_addr, address, + sizeof(address))) + throw std::runtime_error(strerror(errno)); + + port = ntohs(reinterpret_cast<sockaddr_in *>(this->addr)->sin_port); + host = address; +} + +unsigned short TCP::GetPort(void) +{ + sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + if (getsockname(handle, reinterpret_cast<sockaddr *>(&addr), &addrlen) < 0) + throw std::runtime_error(strerror(errno)); + + return ntohs(addr.sin_port); +} diff --git a/modules/tcp/TCP.h b/modules/tcp/TCP.h new file mode 100644 index 0000000..535819c --- /dev/null +++ b/modules/tcp/TCP.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <sys/socket.h> +#include <sys/types.h> /* See NOTES */ + +#include <string> + +class TCP { + public: + class Server; + + TCP(const std::string &host, unsigned short port); + virtual ~TCP(void); + + void Send(const void *data, size_t &szData); + void Recv(void *data, size_t &szData); + int GetHandle(void); + unsigned short GetPort(void); + void GetPeerInfo(std::string &host, unsigned short &port); + + private: + TCP(int handle, sockaddr *addr, socklen_t addrlen); + void SetupOptions(void); + + int handle; + socklen_t addrlen; + sockaddr *addr; +}; diff --git a/modules/tcp/TCPServer.cc b/modules/tcp/TCPServer.cc new file mode 100644 index 0000000..55f8511 --- /dev/null +++ b/modules/tcp/TCPServer.cc @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "TCPServer.h" + +#include <arpa/inet.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#include <cstdlib> +#include <stdexcept> + +#include "aitt_internal.h" + +#define BACKLOG 10 // Accept only 10 simultaneously connections by default + +TCP::Server::Server(const std::string &host, unsigned short &port) + : handle(-1), addr(nullptr), addrlen(0) +{ + int ret = 0; + + do { + handle = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (handle < 0) + break; + + addrlen = sizeof(sockaddr_in); + addr = static_cast<sockaddr *>(calloc(1, sizeof(sockaddr_in))); + if (!addr) + break; + + sockaddr_in *inet_addr = reinterpret_cast<sockaddr_in *>(addr); + if (!inet_pton(AF_INET, host.c_str(), &inet_addr->sin_addr)) { + ret = EINVAL; + break; + } + + inet_addr->sin_port = htons(port); + inet_addr->sin_family = AF_INET; + + int on = 1; + ret = setsockopt(handle, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (ret < 0) + break; + + ret = bind(handle, addr, addrlen); + if (ret < 0) + break; + + if (!port) { + if (getsockname(handle, addr, &addrlen) < 0) + break; + port = ntohs(inet_addr->sin_port); + } + + ret = listen(handle, BACKLOG); + if (ret < 0) + break; + + return; + } while (0); + + if (ret <= 0) + ret = errno; + + free(addr); + + if (handle >= 0 && close(handle) < 0) + ERR_CODE(errno, "close"); + + throw std::runtime_error(strerror(ret)); +} + +TCP::Server::~Server(void) +{ + if (handle < 0) + return; + + free(addr); + if (close(handle) < 0) + ERR_CODE(errno, "close"); +} + +std::unique_ptr<TCP> TCP::Server::AcceptPeer(void) +{ + sockaddr *peerAddr; + socklen_t szAddr = sizeof(sockaddr_in); + int peerHandle; + + peerAddr = static_cast<sockaddr *>(calloc(1, szAddr)); + if (!peerAddr) + throw std::runtime_error(strerror(errno)); + + peerHandle = accept(handle, peerAddr, &szAddr); + if (peerHandle < 0) { + free(peerAddr); + throw std::runtime_error(strerror(errno)); + } + + return std::unique_ptr<TCP>(new TCP(peerHandle, peerAddr, szAddr)); +} + +int TCP::Server::GetHandle(void) +{ + return handle; +} + +unsigned short TCP::Server::GetPort(void) +{ + sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + if (getsockname(handle, reinterpret_cast<sockaddr *>(&addr), &addrlen) < 0) + throw std::runtime_error(strerror(errno)); + + return ntohs(addr.sin_port); +} diff --git a/modules/tcp/TCPServer.h b/modules/tcp/TCPServer.h new file mode 100644 index 0000000..3c82bc6 --- /dev/null +++ b/modules/tcp/TCPServer.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <memory> +#include <string> + +#include "TCP.h" + +class TCP::Server { + public: + Server(const std::string &host, unsigned short &port); + virtual ~Server(void); + + std::unique_ptr<TCP> AcceptPeer(void); + + int GetHandle(void); + unsigned short GetPort(void); + + private: + int handle; + sockaddr *addr; + socklen_t addrlen; +}; diff --git a/modules/tcp/samples/CMakeLists.txt b/modules/tcp/samples/CMakeLists.txt new file mode 100644 index 0000000..8fd1b4b --- /dev/null +++ b/modules/tcp/samples/CMakeLists.txt @@ -0,0 +1,3 @@ +ADD_EXECUTABLE("aitt_tcp_test" tcp_test.cc $<TARGET_OBJECTS:TCP_OBJ>) +TARGET_LINK_LIBRARIES("aitt_tcp_test" ${PROJECT_NAME} Threads::Threads ${AITT_NEEDS_LIBRARIES}) +INSTALL(TARGETS "aitt_tcp_test" DESTINATION ${AITT_TEST_BINDIR}) diff --git a/modules/tcp/samples/tcp_test.cc b/modules/tcp/samples/tcp_test.cc new file mode 100644 index 0000000..d319e27 --- /dev/null +++ b/modules/tcp/samples/tcp_test.cc @@ -0,0 +1,235 @@ +/* + * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <TCP.h> +#include <TCPServer.h> +#include <getopt.h> +#include <glib.h> + +#include <functional> +#include <iostream> +#include <memory> +#include <string> + +//#define _LOG_WITH_TIMESTAMP +#include "aitt_internal.h" +#ifdef _LOG_WITH_TIMESTAMP +__thread __aitt__tls__ __aitt; +#endif + +#define HELLO_STRING "hello" +#define BYE_STRING "bye" +#define SEND_INTERVAL 1000 + +class AittTcpSample { + public: + AittTcpSample(const std::string &host, unsigned short &port) + : server(std::make_unique<TCP::Server>(host, port)) + { + } + virtual ~AittTcpSample(void) {} + + std::unique_ptr<TCP::Server> server; +}; + +int main(int argc, char *argv[]) +{ + const option opts[] = { + { + .name = "server", + .has_arg = 0, + .flag = nullptr, + .val = 's', + }, + { + .name = "host", + .has_arg = 1, + .flag = nullptr, + .val = 'h', + }, + { + .name = "port", + .has_arg = 1, + .flag = nullptr, + .val = 'p', + }, + }; + int c; + int idx; + bool isServer = false; + std::string host = "127.0.0.1"; + unsigned short port = 0; + + while ((c = getopt_long(argc, argv, "sh:up:", opts, &idx)) != -1) { + switch (c) { + case 's': + isServer = true; + break; + case 'h': + host = optarg; + break; + case 'p': + port = std::stoi(optarg); + break; + default: + break; + } + } + + INFO("Host[%s] port[%u]", host.c_str(), port); + + struct EventData { + GSource source_; + GPollFD fd; + AittTcpSample *sample; + }; + + guint timeoutId = 0; + GSource *src = nullptr; + EventData *ed = nullptr; + + GMainLoop *mainLoop = g_main_loop_new(nullptr, FALSE); + if (!mainLoop) { + ERR("Failed to create a main loop"); + return 1; + } + + // Handling the server/client events + if (isServer) { + GSourceFuncs srcs = { + [](GSource *src, gint *timeout) -> gboolean { + *timeout = 1; + return FALSE; + }, + [](GSource *src) -> gboolean { + EventData *ed = reinterpret_cast<EventData *>(src); + RETV_IF(ed == nullptr, FALSE); + + if ((ed->fd.revents & G_IO_IN) == G_IO_IN) + return TRUE; + if ((ed->fd.revents & G_IO_ERR) == G_IO_ERR) + return TRUE; + + return FALSE; + }, + [](GSource *src, GSourceFunc callback, gpointer user_data) -> gboolean { + EventData *ed = reinterpret_cast<EventData *>(src); + RETV_IF(ed == nullptr, FALSE); + + if ((ed->fd.revents & G_IO_ERR) == G_IO_ERR) { + ERR("Error!"); + return FALSE; + } + + std::unique_ptr<TCP> peer = ed->sample->server->AcceptPeer(); + + INFO("Assigned port: %u, %u", ed->sample->server->GetPort(), peer->GetPort()); + std::string peerHost; + unsigned short peerPort = 0; + peer->GetPeerInfo(peerHost, peerPort); + INFO("Peer Info: %s %u", peerHost.c_str(), peerPort); + + char buffer[10]; + void *ptr = static_cast<void *>(buffer); + size_t szData = sizeof(HELLO_STRING); + peer->Recv(ptr, szData); + INFO("Gots[%s]", buffer); + + szData = sizeof(BYE_STRING); + peer->Send(BYE_STRING, szData); + INFO("Reply to client[%s]", BYE_STRING); + + return TRUE; + }, + nullptr, + }; + + src = g_source_new(&srcs, sizeof(EventData)); + if (!src) { + g_main_loop_unref(mainLoop); + ERR("g_source_new failed"); + return 1; + } + + ed = reinterpret_cast<EventData *>(src); + + try { + ed->sample = new AittTcpSample(host, port); + } catch (std::exception &e) { + ERR("new: %s", e.what()); + g_source_unref(src); + g_main_loop_unref(mainLoop); + return 1; + } + + INFO("host: %s, port: %u", host.c_str(), port); + + ed->fd.fd = ed->sample->server->GetHandle(); + ed->fd.events = G_IO_IN | G_IO_ERR; + g_source_add_poll(src, &ed->fd); + guint id = g_source_attach(src, g_main_loop_get_context(mainLoop)); + g_source_unref(src); + if (id == 0) { + delete ed->sample; + g_source_destroy(src); + g_main_loop_unref(mainLoop); + return 1; + } + } else { + static struct Main { + const std::string &host; + unsigned short port; + } main_data = { + .host = host, + .port = port, + }; + // Now the server is ready. + // Let's create a new client and communicate with the server within every + // SEND_INTERTVAL + timeoutId = g_timeout_add( + SEND_INTERVAL, + [](gpointer data) -> gboolean { + Main *ctx = static_cast<Main *>(data); + std::unique_ptr<TCP> client(std::make_unique<TCP>(ctx->host, ctx->port)); + + INFO("Assigned client port: %u", client->GetPort()); + + INFO("Send[%s]", HELLO_STRING); + size_t szBuffer = sizeof(HELLO_STRING); + client->Send(HELLO_STRING, szBuffer); + + char buffer[10]; + void *ptr = static_cast<void *>(buffer); + szBuffer = sizeof(BYE_STRING); + client->Recv(ptr, szBuffer); + INFO("Replied with[%s]", buffer); + + // Send oneshot message, and disconnect from the server + return TRUE; + }, + &main_data); + } + + g_main_loop_run(mainLoop); + + if (src) { + delete ed->sample; + g_source_destroy(src); + } + if (timeoutId) + g_source_remove(timeoutId); + g_main_loop_unref(mainLoop); + return 0; +} diff --git a/modules/tcp/tests/CMakeLists.txt b/modules/tcp/tests/CMakeLists.txt new file mode 100644 index 0000000..bf1adf1 --- /dev/null +++ b/modules/tcp/tests/CMakeLists.txt @@ -0,0 +1,19 @@ +PKG_CHECK_MODULES(UT_NEEDS REQUIRED gmock_main) +INCLUDE_DIRECTORIES(${UT_NEEDS_INCLUDE_DIRS}) +LINK_DIRECTORIES(${UT_NEEDS_LIBRARY_DIRS}) + +SET(AITT_TCP_UT ${PROJECT_NAME}_tcp_ut) + +SET(AITT_TCP_UT_SRC TCP_test.cc TCPServer_test.cc) + +ADD_EXECUTABLE(${AITT_TCP_UT} ${AITT_TCP_UT_SRC} $<TARGET_OBJECTS:TCP_OBJ>) +TARGET_LINK_LIBRARIES(${AITT_TCP_UT} ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_NEEDS_LIBRARIES}) +INSTALL(TARGETS ${AITT_TCP_UT} DESTINATION ${AITT_TEST_BINDIR}) + +ADD_TEST( + NAME + ${AITT_TCP_UT} + COMMAND + ${CMAKE_COMMAND} -E env + ${CMAKE_CURRENT_BINARY_DIR}/${AITT_TCP_UT} --gtest_filter=*_Anytime +) diff --git a/modules/tcp/tests/TCPServer_test.cc b/modules/tcp/tests/TCPServer_test.cc new file mode 100644 index 0000000..e8b48b1 --- /dev/null +++ b/modules/tcp/tests/TCPServer_test.cc @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "../TCPServer.h" + +#include <gtest/gtest.h> + +#include <condition_variable> +#include <cstring> +#include <memory> +#include <mutex> +#include <thread> + +#define TEST_SERVER_ADDRESS "127.0.0.1" +#define TEST_SERVER_INVALID_ADDRESS "287.0.0.1" +#define TEST_SERVER_PORT 8123 +#define TEST_SERVER_AVAILABLE_PORT 0 + +TEST(TCPServer, Positive_Create_Anytime) +{ + unsigned short port = TEST_SERVER_PORT; + std::unique_ptr<TCP::Server> tcp(std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, port)); + ASSERT_NE(tcp, nullptr); +} + +TEST(TCPServer, Negative_Create_Anytime) +{ + try { + unsigned short port = TEST_SERVER_PORT; + + std::unique_ptr<TCP::Server> tcp( + std::make_unique<TCP::Server>(TEST_SERVER_INVALID_ADDRESS, port)); + ASSERT_EQ(tcp, nullptr); + } catch (std::exception &e) { + ASSERT_STREQ(e.what(), strerror(EINVAL)); + } +} + +TEST(TCPServer, Positive_Create_AutoPort_Anytime) +{ + unsigned short port = TEST_SERVER_AVAILABLE_PORT; + std::unique_ptr<TCP::Server> tcp(std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, port)); + ASSERT_NE(tcp, nullptr); + ASSERT_NE(port, 0); +} + +TEST(TCPServer, Positive_GetPort_Anytime) +{ + unsigned short port = TEST_SERVER_PORT; + std::unique_ptr<TCP::Server> tcp(std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, port)); + ASSERT_NE(tcp, nullptr); + ASSERT_EQ(tcp->GetPort(), TEST_SERVER_PORT); +} + +TEST(TCPServer, Positive_GetHandle_Anytime) +{ + unsigned short port = TEST_SERVER_PORT; + std::unique_ptr<TCP::Server> tcp(std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, port)); + ASSERT_NE(tcp, nullptr); + ASSERT_GE(tcp->GetHandle(), 0); +} + +TEST(TCPServer, Positive_GetPort_AutoPort_Anytime) +{ + unsigned short port = TEST_SERVER_AVAILABLE_PORT; + std::unique_ptr<TCP::Server> tcp(std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, port)); + ASSERT_NE(tcp, nullptr); + ASSERT_EQ(tcp->GetPort(), port); +} + +TEST(TCPServer, Positive_AcceptPeer_Anytime) +{ + std::mutex m; + std::condition_variable ready_cv; + std::condition_variable connected_cv; + bool ready = false; + bool connected = false; + + unsigned short serverPort = TEST_SERVER_PORT; + std::thread serverThread( + [serverPort, &m, &ready, &connected, &ready_cv, &connected_cv](void) mutable -> void { + std::unique_ptr<TCP::Server> tcp( + std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, serverPort)); + { + std::lock_guard<std::mutex> lk(m); + ready = true; + } + ready_cv.notify_one(); + + std::unique_ptr<TCP> peer = tcp->AcceptPeer(); + { + std::lock_guard<std::mutex> lk(m); + connected = !!peer; + } + connected_cv.notify_one(); + }); + + { + std::unique_lock<std::mutex> lk(m); + ready_cv.wait(lk, [&ready] { return ready; }); + std::unique_ptr<TCP> tcp(std::make_unique<TCP>(TEST_SERVER_ADDRESS, serverPort)); + connected_cv.wait(lk, [&connected] { return connected; }); + } + + serverThread.join(); + + ASSERT_EQ(ready, true); + ASSERT_EQ(connected, true); +} diff --git a/modules/tcp/tests/TCP_test.cc b/modules/tcp/tests/TCP_test.cc new file mode 100644 index 0000000..604bd23 --- /dev/null +++ b/modules/tcp/tests/TCP_test.cc @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <gtest/gtest.h> + +#include <condition_variable> +#include <cstring> +#include <functional> +#include <memory> +#include <mutex> +#include <thread> + +#include "TCPServer.h" + +#define TEST_SERVER_ADDRESS "127.0.0.1" +#define TEST_SERVER_INVALID_ADDRESS "287.0.0.1" +#define TEST_SERVER_PORT 8123 +#define TEST_SERVER_AVAILABLE_PORT 0 +#define TEST_BUFFER_SIZE 256 +#define TEST_BUFFER_HELLO "Hello World" +#define TEST_BUFFER_BYE "Good Bye" + +class TCPTest : public testing::Test { + protected: + void SetUp() override + { + ready = false; + serverPort = TEST_SERVER_PORT; + customTest = [](void) {}; + + clientThread = std::thread([this](void) mutable -> void { + std::unique_lock<std::mutex> lk(m); + ready_cv.wait(lk, [this] { return ready; }); + client = std::make_unique<TCP>(TEST_SERVER_ADDRESS, serverPort); + + customTest(); + }); + } + + void RunServer(void) + { + tcp = std::make_unique<TCP::Server>(TEST_SERVER_ADDRESS, serverPort); + { + std::lock_guard<std::mutex> lk(m); + ready = true; + } + ready_cv.notify_one(); + + peer = tcp->AcceptPeer(); + } + + void TearDown() override { clientThread.join(); } + + protected: + std::mutex m; + std::condition_variable ready_cv; + bool ready; + unsigned short serverPort; + std::thread clientThread; + std::unique_ptr<TCP::Server> tcp; + std::unique_ptr<TCP> peer; + std::unique_ptr<TCP> client; + std::function<void(void)> customTest; +}; + +TEST(TCP, Negative_Create_InvalidPort_Anytime) +{ + try { + std::unique_ptr<TCP> tcp( + std::make_unique<TCP>(TEST_SERVER_ADDRESS, TEST_SERVER_AVAILABLE_PORT)); + ASSERT_EQ(tcp, nullptr); + } catch (std::exception &e) { + ASSERT_STREQ(e.what(), strerror(EINVAL)); + } +} + +TEST(TCP, Negative_Create_InvalidAddress_Anytime) +{ + try { + std::unique_ptr<TCP> tcp( + std::make_unique<TCP>(TEST_SERVER_INVALID_ADDRESS, TEST_SERVER_PORT)); + ASSERT_EQ(tcp, nullptr); + } catch (std::exception &e) { + ASSERT_STREQ(e.what(), strerror(EINVAL)); + } +} + +TEST_F(TCPTest, Positive_GetPeerInfo_Anytime) +{ + std::string peerHost; + unsigned short peerPort = 0; + + RunServer(); + + peer->GetPeerInfo(peerHost, peerPort); + ASSERT_STREQ(peerHost.c_str(), TEST_SERVER_ADDRESS); + ASSERT_GT(peerPort, 0); +} + +TEST_F(TCPTest, Positive_GetHandle_Anytime) +{ + RunServer(); + int handle = peer->GetHandle(); + ASSERT_GE(handle, 0); +} + +TEST_F(TCPTest, Positive_GetPort_Anytime) +{ + RunServer(); + unsigned short port = peer->GetPort(); + ASSERT_GT(port, 0); +} + +TEST_F(TCPTest, Positive_SendRecv_Anytime) +{ + char helloBuffer[TEST_BUFFER_SIZE]; + char byeBuffer[TEST_BUFFER_SIZE]; + + customTest = [this, &helloBuffer](void) mutable -> void { + size_t szData = sizeof(helloBuffer); + client->Recv(static_cast<void *>(helloBuffer), szData); + + szData = sizeof(TEST_BUFFER_BYE); + client->Send(TEST_BUFFER_BYE, szData); + }; + + RunServer(); + + size_t szMsg = sizeof(TEST_BUFFER_HELLO); + peer->Send(TEST_BUFFER_HELLO, szMsg); + + szMsg = sizeof(byeBuffer); + peer->Recv(static_cast<void *>(byeBuffer), szMsg); + + ASSERT_STREQ(helloBuffer, TEST_BUFFER_HELLO); + ASSERT_STREQ(byeBuffer, TEST_BUFFER_BYE); +} |