diff options
Diffstat (limited to 'Source/cmServer.cxx')
-rw-r--r-- | Source/cmServer.cxx | 329 |
1 files changed, 263 insertions, 66 deletions
diff --git a/Source/cmServer.cxx b/Source/cmServer.cxx index 7fc6ed755..3b2e5f322 100644 --- a/Source/cmServer.cxx +++ b/Source/cmServer.cxx @@ -2,19 +2,43 @@ file Copyright.txt or https://cmake.org/licensing for details. */ #include "cmServer.h" -#include "cmServerConnection.h" +#include <algorithm> +#include <cassert> +#include <cstdint> +#include <iostream> +#include <mutex> +#include <utility> + +#include <cm/memory> +#include <cm/shared_mutex> + +#include "cmsys/FStream.hxx" + +#include "cm_jsoncpp_reader.h" +#include "cm_jsoncpp_writer.h" + +#include "cmConnection.h" +#include "cmFileMonitor.h" +#include "cmJsonObjectDictionary.h" #include "cmServerDictionary.h" #include "cmServerProtocol.h" #include "cmSystemTools.h" -#include "cm_jsoncpp_reader.h" -#include "cm_jsoncpp_writer.h" #include "cmake.h" -#include "cmsys/FStream.hxx" -#include <algorithm> -#include <cassert> -#include <cstdint> -#include <utility> +void on_signal(uv_signal_t* signal, int signum) +{ + auto conn = static_cast<cmServerBase*>(signal->data); + conn->OnSignal(signum); +} + +static void on_walk_to_shutdown(uv_handle_t* handle, void* arg) +{ + (void)arg; + assert(uv_is_closing(handle)); + if (!uv_is_closing(handle)) { + uv_close(handle, &cmEventBasedConnection::on_close); + } +} class cmServer::DebugInfo { @@ -30,76 +54,73 @@ public: uint64_t StartTime; }; -cmServer::cmServer(cmServerConnection* conn, bool supportExperimental) - : Connection(conn) +cmServer::cmServer(cmConnection* conn, bool supportExperimental) + : cmServerBase(conn) , SupportExperimental(supportExperimental) { - this->Connection->SetServer(this); // Register supported protocols: - this->RegisterProtocol(new cmServerProtocol1_0); + this->RegisterProtocol(new cmServerProtocol1); } cmServer::~cmServer() { - if (!this->Protocol) { // Server was never fully started! - return; - } + Close(); for (cmServerProtocol* p : this->SupportedProtocols) { delete p; } - - delete this->Connection; } -void cmServer::PopOne() +void cmServer::ProcessRequest(cmConnection* connection, + const std::string& input) { - if (this->Queue.empty()) { - return; - } - Json::Reader reader; Json::Value value; - const std::string input = this->Queue.front(); - this->Queue.erase(this->Queue.begin()); - if (!reader.parse(input, value)) { - this->WriteParseError("Failed to parse JSON input."); + this->WriteParseError(connection, "Failed to parse JSON input."); return; } std::unique_ptr<DebugInfo> debug; Json::Value debugValue = value["debug"]; if (!debugValue.isNull()) { - debug = std::make_unique<DebugInfo>(); + debug = cm::make_unique<DebugInfo>(); debug->OutputFile = debugValue["dumpToFile"].asString(); debug->PrintStatistics = debugValue["showStats"].asBool(); } - const cmServerRequest request(this, value[kTYPE_KEY].asString(), + const cmServerRequest request(this, connection, value[kTYPE_KEY].asString(), value[kCOOKIE_KEY].asString(), value); - if (request.Type == "") { + if (request.Type.empty()) { cmServerResponse response(request); response.SetError("No type given in request."); - this->WriteResponse(response, nullptr); + this->WriteResponse(connection, response, nullptr); return; } - cmSystemTools::SetMessageCallback(reportMessage, - const_cast<cmServerRequest*>(&request)); + cmSystemTools::SetMessageCallback( + [&request](const std::string& msg, const char* title) { + reportMessage(msg, title, request); + }); + if (this->Protocol) { this->Protocol->CMakeInstance()->SetProgressCallback( - reportProgress, const_cast<cmServerRequest*>(&request)); - this->WriteResponse(this->Protocol->Process(request), debug.get()); + [&request](const std::string& msg, float prog) { + reportProgress(msg, prog, request); + }); + this->WriteResponse(connection, this->Protocol->Process(request), + debug.get()); } else { - this->WriteResponse(this->SetProtocolVersion(request), debug.get()); + this->WriteResponse(connection, this->SetProtocolVersion(request), + debug.get()); } } void cmServer::RegisterProtocol(cmServerProtocol* protocol) { if (protocol->IsExperimental() && !this->SupportExperimental) { + delete protocol; return; } auto version = protocol->ProtocolVersion(); @@ -115,7 +136,7 @@ void cmServer::RegisterProtocol(cmServerProtocol* protocol) } } -void cmServer::PrintHello() const +void cmServer::PrintHello(cmConnection* connection) const { Json::Value hello = Json::objectValue; hello[kTYPE_KEY] = "hello"; @@ -134,37 +155,27 @@ void cmServer::PrintHello() const protocolVersions.append(tmp); } - this->WriteJsonObject(hello, nullptr); -} - -void cmServer::QueueRequest(const std::string& request) -{ - this->Queue.push_back(request); - this->PopOne(); + this->WriteJsonObject(connection, hello, nullptr); } -void cmServer::reportProgress(const char* msg, float progress, void* data) +void cmServer::reportProgress(const std::string& msg, float progress, + const cmServerRequest& request) { - const cmServerRequest* request = static_cast<const cmServerRequest*>(data); - assert(request); if (progress < 0.0f || progress > 1.0f) { - request->ReportMessage(msg, ""); + request.ReportMessage(msg, ""); } else { - request->ReportProgress(0, static_cast<int>(progress * 1000), 1000, msg); + request.ReportProgress(0, static_cast<int>(progress * 1000), 1000, msg); } } -void cmServer::reportMessage(const char* msg, const char* title, - bool& /* cancel */, void* data) +void cmServer::reportMessage(const std::string& msg, const char* title, + const cmServerRequest& request) { - const cmServerRequest* request = static_cast<const cmServerRequest*>(data); - assert(request); - assert(msg); std::string titleString; if (title) { titleString = title; } - request->ReportMessage(std::string(msg), titleString); + request.ReportMessage(msg, titleString); } cmServerResponse cmServer::SetProtocolVersion(const cmServerRequest& request) @@ -209,14 +220,14 @@ cmServerResponse cmServer::SetProtocolVersion(const cmServerRequest& request) } this->Protocol = - this->FindMatchingProtocol(this->SupportedProtocols, major, minor); + cmServer::FindMatchingProtocol(this->SupportedProtocols, major, minor); if (!this->Protocol) { return request.ReportError("Protocol version not supported."); } std::string errorMessage; if (!this->Protocol->Activate(this, request, &errorMessage)) { - this->Protocol = CM_NULLPTR; + this->Protocol = nullptr; return request.ReportError("Failed to activate protocol version: " + errorMessage); } @@ -232,17 +243,27 @@ bool cmServer::Serve(std::string* errorMessage) } assert(!this->Protocol); - return Connection->ProcessEvents(errorMessage); + return cmServerBase::Serve(errorMessage); } cmFileMonitor* cmServer::FileMonitor() const { - return Connection->FileMonitor(); + return fileMonitor.get(); } void cmServer::WriteJsonObject(const Json::Value& jsonValue, const DebugInfo* debug) const { + cm::shared_lock<cm::shared_mutex> lock(ConnectionsMutex); + for (auto& connection : this->Connections) { + WriteJsonObject(connection.get(), jsonValue, debug); + } +} + +void cmServer::WriteJsonObject(cmConnection* connection, + const Json::Value& jsonValue, + const DebugInfo* debug) const +{ Json::FastWriter writer; auto beforeJson = uv_hrtime(); @@ -272,8 +293,7 @@ void cmServer::WriteJsonObject(const Json::Value& jsonValue, } } - Connection->WriteData(std::string("\n") + kSTART_MAGIC + std::string("\n") + - result + kEND_MAGIC + std::string("\n")); + connection->WriteData(result); } cmServerProtocol* cmServer::FindMatchingProtocol( @@ -311,7 +331,7 @@ void cmServer::WriteProgress(const cmServerRequest& request, int min, obj[kPROGRESS_MAXIMUM_KEY] = max; obj[kPROGRESS_CURRENT_KEY] = current; - this->WriteJsonObject(obj, nullptr); + this->WriteJsonObject(request.Connection, obj, nullptr); } void cmServer::WriteMessage(const cmServerRequest& request, @@ -331,10 +351,11 @@ void cmServer::WriteMessage(const cmServerRequest& request, obj[kTITLE_KEY] = title; } - WriteJsonObject(obj, nullptr); + WriteJsonObject(request.Connection, obj, nullptr); } -void cmServer::WriteParseError(const std::string& message) const +void cmServer::WriteParseError(cmConnection* connection, + const std::string& message) const { Json::Value obj = Json::objectValue; obj[kTYPE_KEY] = kERROR_TYPE; @@ -342,7 +363,7 @@ void cmServer::WriteParseError(const std::string& message) const obj[kREPLY_TO_KEY] = ""; obj[kCOOKIE_KEY] = ""; - this->WriteJsonObject(obj, nullptr); + this->WriteJsonObject(connection, obj, nullptr); } void cmServer::WriteSignal(const std::string& name, @@ -358,7 +379,8 @@ void cmServer::WriteSignal(const std::string& name, WriteJsonObject(obj, nullptr); } -void cmServer::WriteResponse(const cmServerResponse& response, +void cmServer::WriteResponse(cmConnection* connection, + const cmServerResponse& response, const DebugInfo* debug) const { assert(response.IsComplete()); @@ -371,5 +393,180 @@ void cmServer::WriteResponse(const cmServerResponse& response, obj[kERROR_MESSAGE_KEY] = response.ErrorMessage(); } - this->WriteJsonObject(obj, debug); + this->WriteJsonObject(connection, obj, debug); +} + +void cmServer::OnConnected(cmConnection* connection) +{ + PrintHello(connection); +} + +void cmServer::OnServeStart() +{ + cmServerBase::OnServeStart(); + fileMonitor = std::make_shared<cmFileMonitor>(GetLoop()); +} + +void cmServer::StartShutDown() +{ + if (fileMonitor) { + fileMonitor->StopMonitoring(); + fileMonitor.reset(); + } + cmServerBase::StartShutDown(); +} + +static void __start_thread(void* arg) +{ + auto server = static_cast<cmServerBase*>(arg); + std::string error; + bool success = server->Serve(&error); + if (!success || !error.empty()) { + std::cerr << "Error during serve: " << error << std::endl; + } +} + +bool cmServerBase::StartServeThread() +{ + ServeThreadRunning = true; + uv_thread_create(&ServeThread, __start_thread, this); + return true; +} + +static void __shutdownThread(uv_async_t* arg) +{ + auto server = static_cast<cmServerBase*>(arg->data); + server->StartShutDown(); +} + +bool cmServerBase::Serve(std::string* errorMessage) +{ +#ifndef NDEBUG + uv_thread_t blank_thread_t = {}; + assert(uv_thread_equal(&blank_thread_t, &ServeThreadId)); + ServeThreadId = uv_thread_self(); +#endif + + errorMessage->clear(); + + ShutdownSignal.init(Loop, __shutdownThread, this); + + SIGINTHandler.init(Loop, this); + SIGHUPHandler.init(Loop, this); + + SIGINTHandler.start(&on_signal, SIGINT); + SIGHUPHandler.start(&on_signal, SIGHUP); + + OnServeStart(); + + { + cm::shared_lock<cm::shared_mutex> lock(ConnectionsMutex); + for (auto& connection : Connections) { + if (!connection->OnServeStart(errorMessage)) { + return false; + } + } + } + + if (uv_run(&Loop, UV_RUN_DEFAULT) != 0) { + // It is important we don't ever let the event loop exit with open handles + // at best this is a memory leak, but it can also introduce race conditions + // which can hang the program. + assert(false && "Event loop stopped in unclean state."); + + *errorMessage = "Internal Error: Event loop stopped in unclean state."; + return false; + } + + return true; +} + +void cmServerBase::OnConnected(cmConnection*) +{ +} + +void cmServerBase::OnServeStart() +{ +} + +void cmServerBase::StartShutDown() +{ + ShutdownSignal.reset(); + SIGINTHandler.reset(); + SIGHUPHandler.reset(); + + { + std::unique_lock<cm::shared_mutex> lock(ConnectionsMutex); + for (auto& connection : Connections) { + connection->OnConnectionShuttingDown(); + } + Connections.clear(); + } + + uv_walk(&Loop, on_walk_to_shutdown, nullptr); +} + +bool cmServerBase::OnSignal(int signum) +{ + (void)signum; + StartShutDown(); + return true; +} + +cmServerBase::cmServerBase(cmConnection* connection) +{ + auto err = uv_loop_init(&Loop); + (void)err; + Loop.data = this; + assert(err == 0); + + AddNewConnection(connection); +} + +void cmServerBase::Close() +{ + if (Loop.data) { + if (ServeThreadRunning) { + this->ShutdownSignal.send(); + uv_thread_join(&ServeThread); + } + + uv_loop_close(&Loop); + Loop.data = nullptr; + } +} +cmServerBase::~cmServerBase() +{ + Close(); +} + +void cmServerBase::AddNewConnection(cmConnection* ownedConnection) +{ + { + std::unique_lock<cm::shared_mutex> lock(ConnectionsMutex); + Connections.emplace_back(ownedConnection); + } + ownedConnection->SetServer(this); +} + +uv_loop_t* cmServerBase::GetLoop() +{ + return &Loop; +} + +void cmServerBase::OnDisconnect(cmConnection* pConnection) +{ + auto pred = [pConnection](const std::unique_ptr<cmConnection>& m) { + return m.get() == pConnection; + }; + { + std::unique_lock<cm::shared_mutex> lock(ConnectionsMutex); + Connections.erase( + std::remove_if(Connections.begin(), Connections.end(), pred), + Connections.end()); + } + + if (Connections.empty()) { + this->ShutdownSignal.send(); + } } |