diff options
author | Kyungwook Tak <k.tak@samsung.com> | 2016-07-06 10:00:19 +0900 |
---|---|---|
committer | Kyungwook Tak <k.tak@samsung.com> | 2016-07-06 15:08:02 +0900 |
commit | a955555c81a28ed2a98b14d545f32bfd5467854c (patch) | |
tree | e7861ee90fc370f1e4848b45a83eb0dd03a25eb2 | |
parent | b189d7c62b4d3bcb2bfc16b5b8dbf7e572036735 (diff) | |
download | csr-framework-a955555c81a28ed2a98b14d545f32bfd5467854c.tar.gz csr-framework-a955555c81a28ed2a98b14d545f32bfd5467854c.tar.bz2 csr-framework-a955555c81a28ed2a98b14d545f32bfd5467854c.zip |
Refactor cancel async scan architecture
Before: Check cancelled flag on both of client/server side in loop of
file visitor. It's hard to control response latency consistently.
After : Change client side connection to non-blocking to cancel it
directly by sending signal to fd. Response latency is consistent
because client just close connection.
Change-Id: If181eb9984357939b2845b7d03a17dac57a0b9d0
Signed-off-by: Kyungwook Tak <k.tak@samsung.com>
-rwxr-xr-x | src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/framework/client/async-logic.cpp | 60 | ||||
-rw-r--r-- | src/framework/client/async-logic.h | 11 | ||||
-rw-r--r-- | src/framework/client/content-screening.cpp | 16 | ||||
-rw-r--r-- | src/framework/client/eventfd.cpp | 61 | ||||
-rw-r--r-- | src/framework/client/eventfd.h | 50 | ||||
-rw-r--r-- | src/framework/client/handle-ext.cpp | 12 | ||||
-rw-r--r-- | src/framework/client/handle-ext.h | 3 | ||||
-rw-r--r-- | src/framework/common/dispatcher.cpp | 8 | ||||
-rw-r--r-- | src/framework/common/dispatcher.h | 2 | ||||
-rw-r--r-- | src/framework/common/mainloop.h | 4 | ||||
-rw-r--r-- | src/framework/common/socket.cpp | 4 | ||||
-rw-r--r-- | src/framework/service/cs-logic.cpp | 28 | ||||
-rw-r--r-- | src/framework/service/exception.cpp | 5 | ||||
-rw-r--r-- | src/framework/service/server-service.cpp | 6 |
15 files changed, 214 insertions, 57 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8470339..a17ad25 100755 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -103,6 +103,7 @@ SET(${TARGET_CSR_CLIENT}_SRCS framework/client/canonicalize.cpp framework/client/content-screening.cpp framework/client/engine-manager.cpp + framework/client/eventfd.cpp framework/client/handle.cpp framework/client/handle-ext.cpp framework/client/utils.cpp diff --git a/src/framework/client/async-logic.cpp b/src/framework/client/async-logic.cpp index 5ed5192..43ca501 100644 --- a/src/framework/client/async-logic.cpp +++ b/src/framework/client/async-logic.cpp @@ -23,9 +23,11 @@ #include <cstdint> #include <utility> +#include <sys/epoll.h> #include "common/exception.h" #include "common/cs-detected.h" +#include "common/connection.h" #include "common/async-protocol.h" #include "common/audit/logger.h" @@ -33,12 +35,15 @@ namespace Csr { namespace Client { AsyncLogic::AsyncLogic(HandleExt *handle, void *userdata) : - m_handle(handle), m_userdata(userdata) + m_handle(handle), m_userdata(userdata), m_dispatcherAsync(new Dispatcher(SockId::CS)) { } -AsyncLogic::~AsyncLogic() +void AsyncLogic::stop(void) { + INFO("async logic stop called! Let's send cancel signal to loop"); + this->m_dispatcherAsync->methodPing(CommandId::CANCEL_OPERATION); + this->m_cancelSignal.send(); } void AsyncLogic::scanDirs(const StrSet &dirs) @@ -53,22 +58,31 @@ void AsyncLogic::scanFiles(const StrSet &files) void AsyncLogic::scanHelper(const CommandId &id, const StrSet &s) { - auto ret = this->m_handle->dispatch<int>(id, *(this->m_handle->getContext()), s); + this->m_dispatcherAsync->methodPing(id, *(this->m_handle->getContext()), s); - if (ret != ASYNC_EVENT_START) - ThrowExc(ret, "Error on async scan. ret: " << ret); + auto fd = this->m_dispatcherAsync->getFd(); + auto cancelEventFd = this->m_cancelSignal.getFd(); - DEBUG("loop for waiting server event start!!"); + this->m_loop.addEventSource(cancelEventFd, EPOLLIN, + [&](uint32_t) { + this->m_cancelSignal.receive(); + ThrowExcInfo(ASYNC_EVENT_CANCEL, "Async event cancelled on fd: " << fd); + }); - while (true) { - auto event = this->m_handle->revent<int>(); + this->m_loop.addEventSource(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, + [&](uint32_t e) { + if (e & (EPOLLHUP | EPOLLRDHUP)) + ThrowExc(CSR_ERROR_SOCKET, "csr-server might be crashed. Finish async client loop"); + + // read event + auto event = this->m_dispatcherAsync->receiveEvent<int>(); DEBUG("event received: " << event); switch (event) { case ASYNC_EVENT_MALWARE_NONE: { DEBUG("ASYNC_EVENT_MALWARE_NONE comes in!"); - auto targetName = this->m_handle->revent<std::string>(); + auto targetName = this->m_dispatcherAsync->receiveEvent<std::string>(); if (targetName.empty()) { ERROR("scanned event received but target name is empty"); @@ -83,7 +97,7 @@ void AsyncLogic::scanHelper(const CommandId &id, const StrSet &s) case ASYNC_EVENT_MALWARE_DETECTED: { DEBUG("ASYNC_EVENT_MALWARE_DETECTED comes in!"); - auto malware = this->m_handle->revent<CsDetected *>(); + auto malware = this->m_dispatcherAsync->receiveEvent<CsDetected *>(); if (malware == nullptr) { ERROR("malware detected event received but handle is null"); @@ -101,21 +115,23 @@ void AsyncLogic::scanHelper(const CommandId &id, const StrSet &s) break; } - case ASYNC_EVENT_COMPLETE: { - DEBUG("Async operation completed"); - return; - } - - case ASYNC_EVENT_CANCEL: { - ThrowExcInfo(ASYNC_EVENT_CANCEL, "Async operation cancelled!"); - } - default: - ThrowExc(event, "Error on async scan! ec: " << event); + ThrowExcInfo(event, "Async event loop terminated by event: " << event); } + }); + + try { + while (true) + this->m_loop.dispatch(-1); + } catch (const Exception &e) { + switch (e.error()) { + case ASYNC_EVENT_COMPLETE: + INFO("Async operation completed"); + break; - if (this->m_handle->isStopped()) - ThrowExcInfo(ASYNC_EVENT_CANCEL, "Async op cancelled!"); + default: + throw; + } } } diff --git a/src/framework/client/async-logic.h b/src/framework/client/async-logic.h index 0136074..7e82857 100644 --- a/src/framework/client/async-logic.h +++ b/src/framework/client/async-logic.h @@ -21,9 +21,14 @@ */ #pragma once +#include <memory> + #include "common/types.h" #include "common/command-id.h" +#include "common/dispatcher.h" +#include "common/mainloop.h" #include "client/handle-ext.h" +#include "client/eventfd.h" namespace Csr { namespace Client { @@ -31,7 +36,7 @@ namespace Client { class AsyncLogic { public: AsyncLogic(HandleExt *handle, void *userdata); - virtual ~AsyncLogic(); + virtual ~AsyncLogic() = default; void scanFiles(const StrSet &files); void scanDirs(const StrSet &dirs); @@ -42,8 +47,10 @@ private: void scanHelper(const CommandId &id, const StrSet &s); HandleExt *m_handle; // for registering results for auto-release - void *m_userdata; + Mainloop m_loop; + EventFd m_cancelSignal; + std::unique_ptr<Dispatcher> m_dispatcherAsync; }; } diff --git a/src/framework/client/content-screening.cpp b/src/framework/client/content-screening.cpp index 61684c0..17d518a 100644 --- a/src/framework/client/content-screening.cpp +++ b/src/framework/client/content-screening.cpp @@ -400,11 +400,13 @@ int csr_cs_scan_files_async(csr_cs_context_h handle, const char *file_paths[], auto task = std::make_shared<Task>([hExt, user_data, fileSet] { EXCEPTION_ASYNC_SAFE_START(hExt->m_cb, user_data) + Client::AsyncLogic l(hExt, user_data); + + hExt->setStopFunc([&l]() { l.stop(); }); + if (hExt->isStopped()) ThrowExcInfo(ASYNC_EVENT_CANCEL, "Async operation cancelled!"); - Client::AsyncLogic l(hExt, user_data); - l.scanFiles(*fileSet); EXCEPTION_SAFE_END @@ -458,11 +460,13 @@ int csr_cs_scan_dirs_async(csr_cs_context_h handle, const char *dir_paths[], auto task = std::make_shared<Task>([hExt, user_data, dirSet] { EXCEPTION_ASYNC_SAFE_START(hExt->m_cb, user_data) + Client::AsyncLogic l(hExt, user_data); + + hExt->setStopFunc([&l]() { l.stop(); }); + if (hExt->isStopped()) ThrowExcInfo(ASYNC_EVENT_CANCEL, "Async operation cancelled!"); - Client::AsyncLogic l(hExt, user_data); - l.scanDirs(*dirSet); EXCEPTION_SAFE_END @@ -490,10 +494,6 @@ int csr_cs_cancel_scanning(csr_cs_context_h handle) if (!hExt->isRunning() || hExt->isStopped()) return CSR_ERROR_NO_TASK; - hExt->turnOnStopFlagOnly(); - - hExt->ping(CommandId::CANCEL_OPERATION); - hExt->stop(); return CSR_ERROR_NONE; diff --git a/src/framework/client/eventfd.cpp b/src/framework/client/eventfd.cpp new file mode 100644 index 0000000..ec3d9ed --- /dev/null +++ b/src/framework/client/eventfd.cpp @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ +/* + * @file eventfd.cpp + * @author Jaemin Ryu (jm77.ryu@samsung.com) + * @version 1.0 + * @brief + */ +#include "client/eventfd.h" + +#include <sys/types.h> +#include <unistd.h> +#include <cstdint> + +#include "common/exception.h" + +namespace Csr { +namespace Client { + +EventFd::EventFd(unsigned int initval, int flags) : + m_fd(::eventfd(initval, flags)) +{ + if (this->m_fd == -1) + ThrowExc(CSR_ERROR_SERVER, "Eventfd from constructor is failed!"); +} + +EventFd::~EventFd() +{ + if (this->m_fd != -1) + ::close(this->m_fd); +} + +void EventFd::send() +{ + const std::uint64_t val = 1; + if (::write(this->m_fd, &val, sizeof(val)) == -1) + ThrowExc(CSR_ERROR_SOCKET, "EventFd send to fd[" << this->m_fd << "] is failed!"); +} + +void EventFd::receive() +{ + std::uint64_t val; + if (::read(this->m_fd, &val, sizeof(val)) == -1) + ThrowExc(CSR_ERROR_SOCKET, "EventFd receive from fd[" << this->m_fd << "] is failed!"); +} + +} +} diff --git a/src/framework/client/eventfd.h b/src/framework/client/eventfd.h new file mode 100644 index 0000000..b6719d1 --- /dev/null +++ b/src/framework/client/eventfd.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ +/* + * @file eventfd.h + * @author Jaemin Ryu (jm77.ryu@samsung.com) + * @version 1.0 + * @brief + */ +#pragma once + +#include <sys/eventfd.h> + +namespace Csr { +namespace Client { + +class EventFd { +public: + EventFd(unsigned int initval = 0, int flags = EFD_SEMAPHORE | EFD_CLOEXEC); + ~EventFd(); + + EventFd(const EventFd &) = delete; + EventFd &operator=(const EventFd &) = delete; + + void send(); + void receive(); + + inline int getFd() const noexcept + { + return this->m_fd; + } + +private: + int m_fd; +}; + +} +} diff --git a/src/framework/client/handle-ext.cpp b/src/framework/client/handle-ext.cpp index d0e6fda..66a9273 100644 --- a/src/framework/client/handle-ext.cpp +++ b/src/framework/client/handle-ext.cpp @@ -41,17 +41,23 @@ HandleExt::~HandleExt() this->m_worker.join(); } -void HandleExt::turnOnStopFlagOnly() +void HandleExt::setStopFunc(std::function<void()> &&func) { std::lock_guard<std::mutex> l(this->m_flagMutex); - this->m_stop = true; + this->m_stopFunc = std::move(func); } void HandleExt::stop() { DEBUG("Stop & join worker..."); - this->turnOnStopFlagOnly(); + { + std::lock_guard<std::mutex> l(this->m_flagMutex); + this->m_stop = true; + + if (this->m_stopFunc != nullptr) + this->m_stopFunc(); + } if (this->m_worker.joinable()) this->m_worker.join(); diff --git a/src/framework/client/handle-ext.h b/src/framework/client/handle-ext.h index d9789a0..31efdf9 100644 --- a/src/framework/client/handle-ext.h +++ b/src/framework/client/handle-ext.h @@ -37,7 +37,7 @@ public: virtual ~HandleExt(); void dispatchAsync(const std::shared_ptr<Task> &task); - void turnOnStopFlagOnly(void); + void setStopFunc(std::function<void()> &&func); void stop(void); bool isStopped(void) const; bool isRunning(void) const; @@ -52,6 +52,7 @@ private: bool m_stop; bool m_isRunning; std::thread m_worker; + std::function<void()> m_stopFunc; mutable std::mutex m_resultsMutex; mutable std::mutex m_flagMutex; }; diff --git a/src/framework/common/dispatcher.cpp b/src/framework/common/dispatcher.cpp index 9f8e023..2982a23 100644 --- a/src/framework/common/dispatcher.cpp +++ b/src/framework/common/dispatcher.cpp @@ -39,4 +39,12 @@ void Dispatcher::connect() Socket::create(this->m_sockId, Socket::Type::CLIENT)); } +int Dispatcher::getFd() const noexcept +{ + if (this->m_connection == nullptr) + return -1; + else + return this->m_connection->getFd(); +} + } // namespace Csr diff --git a/src/framework/common/dispatcher.h b/src/framework/common/dispatcher.h index 3b534db..08a91ee 100644 --- a/src/framework/common/dispatcher.h +++ b/src/framework/common/dispatcher.h @@ -40,6 +40,8 @@ public: Dispatcher(Dispatcher &&) = delete; Dispatcher &operator=(Dispatcher &&) = delete; + int getFd(void) const noexcept; + template<typename Type, typename ...Args> Type methodCall(Args &&...args); diff --git a/src/framework/common/mainloop.h b/src/framework/common/mainloop.h index 7a16bab..18b04d2 100644 --- a/src/framework/common/mainloop.h +++ b/src/framework/common/mainloop.h @@ -45,6 +45,9 @@ public: // if timeout is negative value, no timeout on idle. void run(int timeout); + // Moved to public to customize stop condition + void dispatch(int timeout); + void addEventSource(int fd, uint32_t event, Callback &&callback); void removeEventSource(int fd); size_t countEventSource(void) const; @@ -52,7 +55,6 @@ public: void setIdleChecker(std::function<bool()> &&idleChecker); private: - void dispatch(int timeout); bool m_isTimedOut; int m_pollfd; diff --git a/src/framework/common/socket.cpp b/src/framework/common/socket.cpp index 0b5c499..f698772 100644 --- a/src/framework/common/socket.cpp +++ b/src/framework/common/socket.cpp @@ -200,7 +200,7 @@ void Socket::write(const RawBuffer &data) const auto bytes = ::write(this->m_fd, &size, sizeof(size)); if (bytes < 0) - ThrowExc(CSR_ERROR_SOCKET, "Socket data size write failed on fd[" << this->m_fd << + ThrowExcWarn(CSR_ERROR_SOCKET, "Socket data size write failed on fd[" << this->m_fd << "] with errno: " << errno); while (total < size) { @@ -210,7 +210,7 @@ void Socket::write(const RawBuffer &data) const if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) continue; else - ThrowExc(CSR_ERROR_SOCKET, "Socket write failed on fd[" << this->m_fd << + ThrowExcWarn(CSR_ERROR_SOCKET, "Socket write failed on fd[" << this->m_fd << "] with errno: " << errno); } diff --git a/src/framework/service/cs-logic.cpp b/src/framework/service/cs-logic.cpp index f650644..ed0542e 100644 --- a/src/framework/service/cs-logic.cpp +++ b/src/framework/service/cs-logic.cpp @@ -305,7 +305,7 @@ Db::Cache CsLogic::scanAppDelta(const FilePtr &pkgPtr, const CancelChecker &isCa } int CsLogic::scanApp(const CsContext &context, const FilePtr &pkgPtr, - CsDetectedPtr &malware, const std::function<void()> &isCancelled) + CsDetectedPtr &malware, const CancelChecker &isCancelled) { const auto &pkgPath = pkgPtr->getName(); const auto &pkgId = pkgPtr->getAppPkgId(); @@ -463,7 +463,7 @@ CsLogic::ScanStage CsLogic::judgeScanStage(const CsDetectedPtr &riskiest, } int CsLogic::scanFileInternal(const CsContext &context, const FilePtr &target, - CsDetectedPtr &malware, const std::function<void()> &isCancelled) + CsDetectedPtr &malware, const CancelChecker &isCancelled) { if (target->isInApp()) return this->scanApp(context, target, malware, isCancelled); @@ -525,13 +525,11 @@ RawBuffer CsLogic::scanFile(const CsContext &context, const std::string &filepat } RawBuffer CsLogic::scanFilesAsync(const ConnShPtr &conn, const CsContext &context, - StrSet &paths, const std::function<void()> &isCancelled) + StrSet &paths, const CancelChecker &isCancelled) { if (this->m_db->getEngineState(CSR_ENGINE_CS) != CSR_STATE_ENABLE) ThrowExc(CSR_ERROR_ENGINE_DISABLED, "engine is disabled"); - conn->send(BinaryQueue::Serialize(ASYNC_EVENT_START).pop()); - StrSet canonicalized; for (const auto &path : paths) { @@ -588,7 +586,7 @@ RawBuffer CsLogic::scanFilesAsync(const ConnShPtr &conn, const CsContext &contex } RawBuffer CsLogic::scanDirsAsync(const ConnShPtr &conn, const CsContext &context, - StrSet &paths, const std::function<void()> &isCancelled) + StrSet &paths, const CancelChecker &isCancelled) { if (this->m_db->getEngineState(CSR_ENGINE_CS) != CSR_STATE_ENABLE) ThrowExc(CSR_ERROR_ENGINE_DISABLED, "engine is disabled"); @@ -596,6 +594,8 @@ RawBuffer CsLogic::scanDirsAsync(const ConnShPtr &conn, const CsContext &context StrSet dirs; for (const auto &path : paths) { + isCancelled(); + try { auto target = canonicalizePath(path, true); @@ -621,20 +621,14 @@ RawBuffer CsLogic::scanDirsAsync(const ConnShPtr &conn, const CsContext &context eraseSubdirectories(dirs); - DEBUG("send error none to client before starting scanning"); - - conn->send(BinaryQueue::Serialize(ASYNC_EVENT_START).pop()); - CsEngineContext engineContext(this->m_loader); auto t = this->m_loader->getEngineLatestUpdateTime(engineContext.get()); - DEBUG("Start async scanning!!!!!"); + INFO("Start async scanning!!!!!"); StrSet malwareList; for (const auto &dir : dirs) { - isCancelled(); - - DEBUG("Start async scanning for dir: " << dir); + INFO("Start async scanning for dir: " << dir); for (auto &row : this->m_db->getDetectedAllByNameOnDir(dir, t)) { isCancelled(); @@ -643,7 +637,7 @@ RawBuffer CsLogic::scanDirsAsync(const ConnShPtr &conn, const CsContext &context auto fileptr = File::create(row->targetName, nullptr); CsDetectedPtr malware; - auto retcode = this->scanFileInternal(context, fileptr, malware); + auto retcode = this->scanFileInternal(context, fileptr, malware, isCancelled); switch (retcode) { case CSR_ERROR_NONE: @@ -677,13 +671,15 @@ RawBuffer CsLogic::scanDirsAsync(const ConnShPtr &conn, const CsContext &context } } + INFO("detected malwares rescanning done from db."); + auto startTime = ::time(nullptr); auto lastScanTime = this->m_db->getLastScanTime(dir, t); auto visitor = FsVisitor::create([&](const FilePtr &file) { isCancelled(); CsDetectedPtr malware; - auto retcode = this->scanFileInternal(context, file, malware); + auto retcode = this->scanFileInternal(context, file, malware, isCancelled); DEBUG("scanFileInternal done. file: " << file->getName() << " retcode: " << retcode); diff --git a/src/framework/service/exception.cpp b/src/framework/service/exception.cpp index f395e10..5019fde 100644 --- a/src/framework/service/exception.cpp +++ b/src/framework/service/exception.cpp @@ -37,11 +37,12 @@ RawBuffer exceptionGuard(const std::function<RawBuffer()> &func) try { return func(); } catch (const Exception &e) { - if (e.error() == ASYNC_EVENT_CANCEL) + if (e.error() == CSR_ERROR_SOCKET) + WARN("Socket error. Client might cancel async scan or crashed: " << e.what()); + else if (e.error() == ASYNC_EVENT_CANCEL) INFO("Async operation cancel exception: " << e.what()); else ERROR("Exception caught. code: " << e.error() << " message: " << e.what()); - return BinaryQueue::Serialize(e.error()).pop(); } catch (const std::invalid_argument &e) { ERROR("Invalid argument: " << e.what()); diff --git a/src/framework/service/server-service.cpp b/src/framework/service/server-service.cpp index 38bd875..24bd1bf 100644 --- a/src/framework/service/server-service.cpp +++ b/src/framework/service/server-service.cpp @@ -428,6 +428,12 @@ void ServerService::onMessageProcess(const ConnShPtr &connection) if (!outbuf.empty()) connection->send(outbuf); + } catch (const Exception &e) { + if (e.error() == CSR_ERROR_SOCKET) + WARN("The connection is closed by the peer. Client might cancel async " + "scanning or crashed: " << e.what()); + else + throw; } catch (const std::exception &e) { ERROR("exception on workqueue task: " << e.what()); try { |