server: Refactor to make the event loop owned by server object

This commit is contained in:
Justin Berger
2017-03-24 21:38:52 -06:00
parent 5acbf08bff
commit d4f5d35ca4
12 changed files with 776 additions and 443 deletions

View File

@@ -976,7 +976,9 @@ target_link_libraries(cmake CMakeLib)
if(CMake_ENABLE_SERVER_MODE)
add_library(CMakeServerLib
cmConnection.h cmConnection.cxx
cmFileMonitor.cxx cmFileMonitor.h
cmPipeConnection.cxx cmPipeConnection.h
cmServer.cxx cmServer.h
cmServerConnection.cxx cmServerConnection.h
cmServerProtocol.cxx cmServerProtocol.h

150
Source/cmConnection.cxx Normal file
View File

@@ -0,0 +1,150 @@
/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
file Copyright.txt or https://cmake.org/licensing for details. */
#include "cmConnection.h"
#include "cmServer.h"
#include "cm_uv.h"
#include <cassert>
#include <cstring>
struct write_req_t
{
uv_write_t req;
uv_buf_t buf;
};
void cmConnection::on_alloc_buffer(uv_handle_t* handle, size_t suggested_size,
uv_buf_t* buf)
{
(void)(handle);
char* rawBuffer = new char[suggested_size];
*buf = uv_buf_init(rawBuffer, static_cast<unsigned int>(suggested_size));
}
void cmConnection::on_read(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf)
{
auto conn = reinterpret_cast<cmConnection*>(stream->data);
if (conn) {
if (nread >= 0) {
conn->ReadData(std::string(buf->base, buf->base + nread));
} else {
conn->OnDisconnect((int)nread);
}
}
delete[](buf->base);
}
void cmConnection::on_close_delete(uv_handle_t* handle)
{
delete handle;
}
void cmConnection::on_close(uv_handle_t*)
{
}
void cmConnection::on_write(uv_write_t* req, int status)
{
(void)(status);
// Free req and buffer
write_req_t* wr = reinterpret_cast<write_req_t*>(req);
delete[](wr->buf.base);
delete wr;
}
void cmConnection::on_new_connection(uv_stream_t* stream, int status)
{
(void)(status);
auto conn = reinterpret_cast<cmConnection*>(stream->data);
if (conn) {
conn->Connect(stream);
}
}
bool cmConnection::IsOpen() const
{
return this->WriteStream != CM_NULLPTR;
}
void cmConnection::WriteData(const std::string& data)
{
assert(this->WriteStream);
auto ds = data.size();
write_req_t* req = new write_req_t;
req->req.data = this;
req->buf = uv_buf_init(new char[ds], static_cast<unsigned int>(ds));
memcpy(req->buf.base, data.c_str(), ds);
uv_write(reinterpret_cast<uv_write_t*>(req),
static_cast<uv_stream_t*>(this->WriteStream), &req->buf, 1,
on_write);
}
cmConnection::~cmConnection()
{
OnServerShuttingDown();
}
void cmConnection::ReadData(const std::string& data)
{
this->RawReadBuffer += data;
if (BufferStrategy) {
std::string packet = BufferStrategy->BufferMessage(this->RawReadBuffer);
do {
ProcessRequest(packet);
packet = BufferStrategy->BufferMessage(this->RawReadBuffer);
} while (!packet.empty());
} else {
ProcessRequest(this->RawReadBuffer);
this->RawReadBuffer.clear();
}
}
void cmConnection::SetServer(cmServerBase* s)
{
Server = s;
}
cmConnection::cmConnection(cmConnectionBufferStrategy* bufferStrategy)
: BufferStrategy(bufferStrategy)
{
}
void cmConnection::Connect(uv_stream_t*)
{
Server->OnConnected(nullptr);
}
void cmConnection::ProcessRequest(const std::string& request)
{
Server->ProcessRequest(this, request);
}
bool cmConnection::OnServeStart(std::string* errString)
{
(void)errString;
return true;
}
void cmConnection::OnDisconnect(int errorCode)
{
(void)errorCode;
this->Server->OnDisconnect(this);
}
bool cmConnection::OnServerShuttingDown()
{
this->WriteStream->data = nullptr;
this->ReadStream->data = nullptr;
this->ReadStream = nullptr;
this->WriteStream = nullptr;
return true;
}

104
Source/cmConnection.h Normal file
View File

@@ -0,0 +1,104 @@
/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
file Copyright.txt or https://cmake.org/licensing for details. */
#pragma once
#include "cmConfigure.h"
#include "cm_uv.h"
#include <cstddef>
#include <memory>
#include <string>
class cmServerBase;
/***
* Given a sequence of bytes with any kind of buffering, instances of this
* class arrange logical chunks according to whatever the use case is for
* the connection.
*/
class cmConnectionBufferStrategy
{
public:
virtual ~cmConnectionBufferStrategy();
/***
* Called whenever with an active raw buffer. If a logical chunk
* becomes available, that chunk is returned and that portion is
* removed from the rawBuffer
*
* @param rawBuffer in/out parameter. Receive buffer; the buffer strategy is
* free to manipulate this buffer anyway it needs to.
*
* @return Next chunk from the stream. Returns the empty string if a chunk
* isn't ready yet. Users of this interface should repeatedly call this
* function until an empty string is returned since its entirely possible
* multiple chunks come in a single raw buffer.
*/
virtual std::string BufferMessage(std::string& rawBuffer) = 0;
/***
* Resets the internal state of the buffering
*/
virtual void clear();
// TODO: There should be a callback / flag set for errors
};
/***
* Abstraction of a connection; ties in event callbacks from libuv and notifies
* the server when appropriate
*/
class cmConnection
{
CM_DISABLE_COPY(cmConnection)
public:
virtual ~cmConnection();
/***
* @param bufferStrategy If no strategy is given, it will process the raw
* chunks as they come in. The connection
* owns the pointer given.
*/
cmConnection(cmConnectionBufferStrategy* bufferStrategy = nullptr);
virtual void Connect(uv_stream_t* server);
virtual void ReadData(const std::string& data);
virtual bool OnServeStart(std::string* errString);
virtual bool OnServerShuttingDown();
virtual bool IsOpen() const;
virtual void WriteData(const std::string& data);
virtual void ProcessRequest(const std::string& request);
virtual void SetServer(cmServerBase* s);
virtual void OnDisconnect(int errorCode);
uv_stream_t* ReadStream = nullptr;
cmServerBase* Server = nullptr;
uv_stream_t* WriteStream = nullptr;
static void on_close(uv_handle_t* handle);
static void on_close_delete(uv_handle_t* handle);
protected:
std::string RawReadBuffer;
std::unique_ptr<cmConnectionBufferStrategy> BufferStrategy;
static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
static void on_write(uv_write_t* req, int status);
static void on_new_connection(uv_stream_t* stream, int status);
static void on_alloc_buffer(uv_handle_t* handle, size_t suggested_size,
uv_buf_t* buf);
};

View File

@@ -171,7 +171,9 @@ public:
{
if (this->Handle) {
uv_fs_event_stop(this->Handle);
uv_close(reinterpret_cast<uv_handle_t*>(this->Handle), &on_fs_close);
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(this->Handle))) {
uv_close(reinterpret_cast<uv_handle_t*>(this->Handle), &on_fs_close);
}
this->Handle = nullptr;
}
cmVirtualDirectoryWatcher::StopWatching();

View File

@@ -0,0 +1,81 @@
/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
file Copyright.txt or https://cmake.org/licensing for details. */
#include "cmPipeConnection.h"
#include "cmConfigure.h"
#include "cmServer.h"
cmPipeConnection::cmPipeConnection(const std::string& name,
cmConnectionBufferStrategy* bufferStrategy)
: cmConnection(bufferStrategy)
, PipeName(name)
{
}
void cmPipeConnection::Connect(uv_stream_t* server)
{
if (this->ClientPipe) {
// Accept and close all pipes but the first:
uv_pipe_t* rejectPipe = new uv_pipe_t();
uv_pipe_init(this->Server->GetLoop(), rejectPipe, 0);
uv_accept(server, reinterpret_cast<uv_stream_t*>(rejectPipe));
uv_close(reinterpret_cast<uv_handle_t*>(rejectPipe), &on_close_delete);
return;
}
this->ClientPipe = new uv_pipe_t();
uv_pipe_init(this->Server->GetLoop(), this->ClientPipe, 0);
this->ClientPipe->data = static_cast<cmConnection*>(this);
auto client = reinterpret_cast<uv_stream_t*>(this->ClientPipe);
if (uv_accept(server, client) != 0) {
uv_close(reinterpret_cast<uv_handle_t*>(client), &on_close_delete);
this->ClientPipe = CM_NULLPTR;
return;
}
this->ReadStream = client;
this->WriteStream = client;
uv_read_start(this->ReadStream, on_alloc_buffer, on_read);
Server->OnConnected(this);
}
bool cmPipeConnection::OnServeStart(std::string* errorMessage)
{
this->ServerPipe = new uv_pipe_t();
uv_pipe_init(this->Server->GetLoop(), this->ServerPipe, 0);
this->ServerPipe->data = static_cast<cmConnection*>(this);
int r;
if ((r = uv_pipe_bind(this->ServerPipe, this->PipeName.c_str())) != 0) {
*errorMessage = std::string("Internal Error with ") + this->PipeName +
": " + uv_err_name(r);
return false;
}
auto serverStream = reinterpret_cast<uv_stream_t*>(this->ServerPipe);
if ((r = uv_listen(serverStream, 1, on_new_connection)) != 0) {
*errorMessage = std::string("Internal Error listening on ") +
this->PipeName + ": " + uv_err_name(r);
return false;
}
return cmConnection::OnServeStart(errorMessage);
}
bool cmPipeConnection::OnServerShuttingDown()
{
if (this->ClientPipe) {
uv_close(reinterpret_cast<uv_handle_t*>(this->ClientPipe),
&on_close_delete);
this->WriteStream->data = nullptr;
}
uv_close(reinterpret_cast<uv_handle_t*>(this->ServerPipe), &on_close_delete);
this->ClientPipe = nullptr;
this->ServerPipe = nullptr;
this->WriteStream = nullptr;
this->ReadStream = nullptr;
return cmConnection::OnServerShuttingDown();
}

28
Source/cmPipeConnection.h Normal file
View File

@@ -0,0 +1,28 @@
/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
file Copyright.txt or https://cmake.org/licensing for details. */
#pragma once
#include "cmConnection.h"
#include "cm_uv.h"
#include <string>
class cmPipeConnection : public cmConnection
{
public:
cmPipeConnection(const std::string& name,
cmConnectionBufferStrategy* bufferStrategy = nullptr);
bool OnServeStart(std::string* pString) override;
bool OnServerShuttingDown() override;
void Connect(uv_stream_t* server) override;
private:
const std::string PipeName;
uv_pipe_t* ServerPipe = nullptr;
uv_pipe_t* ClientPipe = nullptr;
};

View File

@@ -2,7 +2,8 @@
file Copyright.txt or https://cmake.org/licensing for details. */
#include "cmServer.h"
#include "cmServerConnection.h"
#include "cmConnection.h"
#include "cmFileMonitor.h"
#include "cmServerDictionary.h"
#include "cmServerProtocol.h"
#include "cmSystemTools.h"
@@ -14,8 +15,23 @@
#include <algorithm>
#include <cassert>
#include <cstdint>
#include <memory>
#include <utility>
void on_signal(uv_signal_t* signal, int signum)
{
auto conn = reinterpret_cast<cmServerBase*>(signal->data);
conn->OnSignal(signum);
}
static void on_walk_to_shutdown(uv_handle_t* handle, void* arg)
{
(void)arg;
if (!uv_is_closing(handle)) {
uv_close(handle, &cmConnection::on_close);
}
}
class cmServer::DebugInfo
{
public:
@@ -30,11 +46,10 @@ public:
uint64_t StartTime;
};
cmServer::cmServer(cmServerConnection* conn, bool supportExperimental)
: Connection(conn)
cmServer::cmServer(cmConnection* conn, bool supportExperimental)
: cmServerBase(conn)
, SupportExperimental(supportExperimental)
{
this->Connection->SetServer(this);
// Register supported protocols:
this->RegisterProtocol(new cmServerProtocol1_0);
}
@@ -48,23 +63,15 @@ cmServer::~cmServer()
for (cmServerProtocol* p : this->SupportedProtocols) {
delete p;
}
delete this->Connection;
}
void cmServer::PopOne()
void cmServer::ProcessRequest(cmConnection* connection,
const std::string& input)
{
if (this->Queue.empty()) {
return;
}
Json::Reader reader;
Json::Value value;
const std::string input = this->Queue.front();
this->Queue.erase(this->Queue.begin());
if (!reader.parse(input, value)) {
this->WriteParseError("Failed to parse JSON input.");
this->WriteParseError(connection, "Failed to parse JSON input.");
return;
}
@@ -82,7 +89,7 @@ void cmServer::PopOne()
if (request.Type == "") {
cmServerResponse response(request);
response.SetError("No type given in request.");
this->WriteResponse(response, nullptr);
this->WriteResponse(connection, response, nullptr);
return;
}
@@ -91,9 +98,11 @@ void cmServer::PopOne()
if (this->Protocol) {
this->Protocol->CMakeInstance()->SetProgressCallback(
reportProgress, const_cast<cmServerRequest*>(&request));
this->WriteResponse(this->Protocol->Process(request), debug.get());
this->WriteResponse(connection, this->Protocol->Process(request),
debug.get());
} else {
this->WriteResponse(this->SetProtocolVersion(request), debug.get());
this->WriteResponse(connection, this->SetProtocolVersion(request),
debug.get());
}
}
@@ -115,7 +124,7 @@ void cmServer::RegisterProtocol(cmServerProtocol* protocol)
}
}
void cmServer::PrintHello() const
void cmServer::PrintHello(cmConnection* connection) const
{
Json::Value hello = Json::objectValue;
hello[kTYPE_KEY] = "hello";
@@ -134,13 +143,7 @@ void cmServer::PrintHello() const
protocolVersions.append(tmp);
}
this->WriteJsonObject(hello, nullptr);
}
void cmServer::QueueRequest(const std::string& request)
{
this->Queue.push_back(request);
this->PopOne();
this->WriteJsonObject(connection, hello, nullptr);
}
void cmServer::reportProgress(const char* msg, float progress, void* data)
@@ -232,16 +235,25 @@ bool cmServer::Serve(std::string* errorMessage)
}
assert(!this->Protocol);
return Connection->ProcessEvents(errorMessage);
return cmServerBase::Serve(errorMessage);
}
cmFileMonitor* cmServer::FileMonitor() const
{
return Connection->FileMonitor();
return fileMonitor.get();
}
void cmServer::WriteJsonObject(const Json::Value& jsonValue,
const DebugInfo* debug) const
{
for (auto& connection : this->Connections) {
WriteJsonObject(connection.get(), jsonValue, debug);
}
}
void cmServer::WriteJsonObject(cmConnection* connection,
const Json::Value& jsonValue,
const DebugInfo* debug) const
{
Json::FastWriter writer;
@@ -272,7 +284,7 @@ void cmServer::WriteJsonObject(const Json::Value& jsonValue,
}
}
Connection->WriteData(std::string("\n") + kSTART_MAGIC + std::string("\n") +
connection->WriteData(std::string("\n") + kSTART_MAGIC + std::string("\n") +
result + kEND_MAGIC + std::string("\n"));
}
@@ -334,7 +346,8 @@ void cmServer::WriteMessage(const cmServerRequest& request,
WriteJsonObject(obj, nullptr);
}
void cmServer::WriteParseError(const std::string& message) const
void cmServer::WriteParseError(cmConnection* connection,
const std::string& message) const
{
Json::Value obj = Json::objectValue;
obj[kTYPE_KEY] = kERROR_TYPE;
@@ -342,7 +355,7 @@ void cmServer::WriteParseError(const std::string& message) const
obj[kREPLY_TO_KEY] = "";
obj[kCOOKIE_KEY] = "";
this->WriteJsonObject(obj, nullptr);
this->WriteJsonObject(connection, obj, nullptr);
}
void cmServer::WriteSignal(const std::string& name,
@@ -358,7 +371,8 @@ void cmServer::WriteSignal(const std::string& name,
WriteJsonObject(obj, nullptr);
}
void cmServer::WriteResponse(const cmServerResponse& response,
void cmServer::WriteResponse(cmConnection* connection,
const cmServerResponse& response,
const DebugInfo* debug) const
{
assert(response.IsComplete());
@@ -371,5 +385,161 @@ void cmServer::WriteResponse(const cmServerResponse& response,
obj[kERROR_MESSAGE_KEY] = response.ErrorMessage();
}
this->WriteJsonObject(obj, debug);
this->WriteJsonObject(connection, obj, debug);
}
void cmServer::OnConnected(cmConnection* connection)
{
PrintHello(connection);
}
void cmServer::OnServeStart()
{
cmServerBase::OnServeStart();
fileMonitor = std::make_shared<cmFileMonitor>(GetLoop());
}
void cmServer::StartShutDown()
{
if (fileMonitor) {
fileMonitor->StopMonitoring();
fileMonitor.reset();
}
cmServerBase::StartShutDown();
}
static void __start_thread(void* arg)
{
auto server = reinterpret_cast<cmServerBase*>(arg);
std::string error;
server->Serve(&error);
}
bool cmServerBase::StartServeThread()
{
ServeThreadRunning = true;
uv_thread_create(&ServeThread, __start_thread, this);
return true;
}
bool cmServerBase::Serve(std::string* errorMessage)
{
errorMessage->clear();
uv_signal_init(&Loop, &this->SIGINTHandler);
uv_signal_init(&Loop, &this->SIGHUPHandler);
this->SIGINTHandler.data = this;
this->SIGHUPHandler.data = this;
uv_signal_start(&this->SIGINTHandler, &on_signal, SIGINT);
uv_signal_start(&this->SIGHUPHandler, &on_signal, SIGHUP);
OnServeStart();
for (auto& connection : Connections) {
if (!connection->OnServeStart(errorMessage)) {
return false;
}
}
if (uv_run(&Loop, UV_RUN_DEFAULT) != 0) {
*errorMessage = "Internal Error: Event loop stopped in unclean state.";
StartShutDown();
return false;
}
ServeThreadRunning = false;
return true;
}
void cmServerBase::OnConnected(cmConnection*)
{
}
void cmServerBase::OnDisconnect()
{
}
void cmServerBase::OnServeStart()
{
uv_signal_start(&this->SIGINTHandler, &on_signal, SIGINT);
uv_signal_start(&this->SIGHUPHandler, &on_signal, SIGHUP);
}
void cmServerBase::StartShutDown()
{
if (!uv_is_closing((const uv_handle_t*)&this->SIGINTHandler)) {
uv_signal_stop(&this->SIGINTHandler);
}
if (!uv_is_closing((const uv_handle_t*)&this->SIGHUPHandler)) {
uv_signal_stop(&this->SIGHUPHandler);
}
for (auto& connection : Connections) {
connection->OnServerShuttingDown();
}
Connections.clear();
uv_stop(&Loop);
uv_walk(&Loop, on_walk_to_shutdown, CM_NULLPTR);
uv_run(&Loop, UV_RUN_DEFAULT);
}
bool cmServerBase::OnSignal(int signum)
{
(void)signum;
StartShutDown();
return true;
}
cmServerBase::cmServerBase(cmConnection* connection)
{
uv_loop_init(&Loop);
uv_signal_init(&Loop, &this->SIGINTHandler);
uv_signal_init(&Loop, &this->SIGHUPHandler);
this->SIGINTHandler.data = this;
this->SIGHUPHandler.data = this;
AddNewConnection(connection);
}
cmServerBase::~cmServerBase()
{
if (ServeThreadRunning) {
StartShutDown();
uv_thread_join(&ServeThread);
}
uv_loop_close(&Loop);
}
void cmServerBase::AddNewConnection(cmConnection* ownedConnection)
{
Connections.emplace_back(ownedConnection);
ownedConnection->SetServer(this);
}
uv_loop_t* cmServerBase::GetLoop()
{
return &Loop;
}
void cmServerBase::OnDisconnect(cmConnection* pConnection)
{
auto pred = [pConnection](const std::unique_ptr<cmConnection>& m) {
return m.get() == pConnection;
};
Connections.erase(
std::remove_if(Connections.begin(), Connections.end(), pred),
Connections.end());
if (Connections.empty()) {
StartShutDown();
}
}

View File

@@ -7,26 +7,83 @@
#include "cm_jsoncpp_value.h"
#include "cm_uv.h"
#include <memory> // IWYU pragma: keep
#include <string>
#include <vector>
class cmConnection;
class cmFileMonitor;
class cmServerConnection;
class cmServerProtocol;
class cmServerRequest;
class cmServerResponse;
class cmServer
/***
* This essentially hold and manages a libuv event queue and responds to
* messages
* on any of its connections.
*/
class cmServerBase
{
public:
cmServerBase(cmConnection* connection);
virtual ~cmServerBase();
virtual void AddNewConnection(cmConnection* ownedConnection);
/***
* The main override responsible for tailoring behavior towards
* whatever the given server is supposed to do
*
* This should almost always be called by the given connections
* directly.
*
* @param connection The connectiont the request was received on
* @param request The actual request
*/
virtual void ProcessRequest(cmConnection* connection,
const std::string& request) = 0;
virtual void OnConnected(cmConnection* connection);
virtual void OnDisconnect();
/***
* Start a dedicated thread. If this is used to start the server, it will
* join on the
* servers dtor.
*/
virtual bool StartServeThread();
virtual bool Serve(std::string* errorMessage);
virtual void OnServeStart();
virtual void StartShutDown();
virtual bool OnSignal(int signum);
uv_loop_t* GetLoop();
void OnDisconnect(cmConnection* pConnection);
protected:
std::vector<std::unique_ptr<cmConnection> > Connections;
bool ServeThreadRunning = false;
uv_thread_t ServeThread;
uv_loop_t Loop;
uv_signal_t SIGINTHandler;
uv_signal_t SIGHUPHandler;
};
class cmServer : public cmServerBase
{
CM_DISABLE_COPY(cmServer)
public:
class DebugInfo;
cmServer(cmServerConnection* conn, bool supportExperimental);
~cmServer();
cmServer(cmConnection* conn, bool supportExperimental);
~cmServer() override;
bool Serve(std::string* errorMessage);
bool Serve(std::string* errorMessage) override;
cmFileMonitor* FileMonitor() const;
@@ -34,9 +91,20 @@ private:
void RegisterProtocol(cmServerProtocol* protocol);
// Callbacks from cmServerConnection:
void PopOne();
void QueueRequest(const std::string& request);
void ProcessRequest(cmConnection* connection,
const std::string& request) override;
std::shared_ptr<cmFileMonitor> fileMonitor;
public:
void OnServeStart() override;
void StartShutDown() override;
public:
void OnConnected(cmConnection* connection) override;
private:
static void reportProgress(const char* msg, float progress, void* data);
static void reportMessage(const char* msg, const char* title, bool& cancel,
void* data);
@@ -44,36 +112,37 @@ private:
// Handle requests:
cmServerResponse SetProtocolVersion(const cmServerRequest& request);
void PrintHello() const;
void PrintHello(cmConnection* connection) const;
// Write responses:
void WriteProgress(const cmServerRequest& request, int min, int current,
int max, const std::string& message) const;
void WriteMessage(const cmServerRequest& request, const std::string& message,
const std::string& title) const;
void WriteResponse(const cmServerResponse& response,
void WriteResponse(cmConnection* connection,
const cmServerResponse& response,
const DebugInfo* debug) const;
void WriteParseError(const std::string& message) const;
void WriteParseError(cmConnection* connection,
const std::string& message) const;
void WriteSignal(const std::string& name, const Json::Value& obj) const;
void WriteJsonObject(Json::Value const& jsonValue,
const DebugInfo* debug) const;
void WriteJsonObject(cmConnection* connection, Json::Value const& jsonValue,
const DebugInfo* debug) const;
static cmServerProtocol* FindMatchingProtocol(
const std::vector<cmServerProtocol*>& protocols, int major, int minor);
cmServerConnection* Connection = nullptr;
const bool SupportExperimental;
cmServerProtocol* Protocol = nullptr;
std::vector<cmServerProtocol*> SupportedProtocols;
std::vector<std::string> Queue;
std::string DataBuffer;
std::string JsonData;
uv_loop_t* Loop = nullptr;
typedef union
{
uv_tty_t tty;
@@ -87,7 +156,6 @@ private:
mutable bool Writing = false;
friend class cmServerConnection;
friend class cmServerProtocol;
friend class cmServerRequest;
};

View File

@@ -2,376 +2,123 @@
file Copyright.txt or https://cmake.org/licensing for details. */
#include "cmServerConnection.h"
#include "cmFileMonitor.h"
#include "cmServer.h"
#include "cmServerDictionary.h"
#include <assert.h>
#include <string.h>
namespace {
struct write_req_t
cmStdIoConnection::cmStdIoConnection(
cmConnectionBufferStrategy* bufferStrategy)
: cmConnection(bufferStrategy)
, Input()
, Output()
{
uv_write_t req;
uv_buf_t buf;
};
void on_alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
(void)(handle);
char* rawBuffer = new char[suggested_size];
*buf = uv_buf_init(rawBuffer, static_cast<unsigned int>(suggested_size));
}
void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
void cmStdIoConnection::SetServer(cmServerBase* s)
{
auto conn = reinterpret_cast<cmServerConnection*>(stream->data);
if (nread >= 0) {
conn->ReadData(std::string(buf->base, buf->base + nread));
cmConnection::SetServer(s);
if (uv_guess_handle(1) == UV_TTY) {
usesTty = true;
this->Input.tty = new uv_tty_t();
uv_tty_init(this->Server->GetLoop(), this->Input.tty, 0, 1);
uv_tty_set_mode(this->Input.tty, UV_TTY_MODE_NORMAL);
this->Input.tty->data = static_cast<cmConnection*>(this);
this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.tty);
this->Output.tty = new uv_tty_t();
uv_tty_init(this->Server->GetLoop(), this->Output.tty, 1, 0);
uv_tty_set_mode(this->Output.tty, UV_TTY_MODE_NORMAL);
this->Output.tty->data = static_cast<cmConnection*>(this);
this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.tty);
} else {
conn->TriggerShutdown();
usesTty = false;
this->Input.pipe = new uv_pipe_t();
uv_pipe_init(this->Server->GetLoop(), this->Input.pipe, 0);
uv_pipe_open(this->Input.pipe, 0);
this->Input.pipe->data = static_cast<cmConnection*>(this);
this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.pipe);
this->Output.pipe = new uv_pipe_t();
uv_pipe_init(this->Server->GetLoop(), this->Output.pipe, 0);
uv_pipe_open(this->Output.pipe, 1);
this->Output.pipe->data = static_cast<cmConnection*>(this);
this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.pipe);
}
delete[](buf->base);
}
void on_write(uv_write_t* req, int status)
bool cmStdIoConnection::OnServeStart(std::string* pString)
{
(void)(status);
auto conn = reinterpret_cast<cmServerConnection*>(req->data);
// Free req and buffer
write_req_t* wr = reinterpret_cast<write_req_t*>(req);
delete[](wr->buf.base);
delete wr;
conn->ProcessNextRequest();
uv_read_start(this->ReadStream, on_alloc_buffer, on_read);
Server->OnConnected(this);
return cmConnection::OnServeStart(pString);
}
void on_new_connection(uv_stream_t* stream, int status)
bool cmStdIoConnection::OnServerShuttingDown()
{
(void)(status);
auto conn = reinterpret_cast<cmServerConnection*>(stream->data);
conn->Connect(stream);
}
cmConnection::OnServerShuttingDown();
void on_signal(uv_signal_t* signal, int signum)
{
auto conn = reinterpret_cast<cmServerConnection*>(signal->data);
(void)(signum);
conn->TriggerShutdown();
}
void on_signal_close(uv_handle_t* handle)
{
delete reinterpret_cast<uv_signal_t*>(handle);
}
void on_pipe_close(uv_handle_t* handle)
{
delete reinterpret_cast<uv_pipe_t*>(handle);
}
void on_tty_close(uv_handle_t* handle)
{
delete reinterpret_cast<uv_tty_t*>(handle);
}
} // namespace
class LoopGuard
{
public:
LoopGuard(cmServerConnection* connection)
: Connection(connection)
{
this->Connection->mLoop = uv_default_loop();
if (!this->Connection->mLoop) {
return;
}
this->Connection->mFileMonitor =
new cmFileMonitor(this->Connection->mLoop);
if (usesTty) {
uv_read_stop(reinterpret_cast<uv_stream_t*>(this->Input.tty));
uv_close(reinterpret_cast<uv_handle_t*>(this->Input.tty),
&on_close_delete);
uv_close(reinterpret_cast<uv_handle_t*>(this->Output.tty),
&on_close_delete);
} else {
uv_close(reinterpret_cast<uv_handle_t*>(this->Input.pipe),
&on_close_delete);
uv_close(reinterpret_cast<uv_handle_t*>(this->Output.pipe),
&on_close_delete);
}
~LoopGuard()
{
if (!this->Connection->mLoop) {
return;
}
if (this->Connection->mFileMonitor) {
delete this->Connection->mFileMonitor;
}
uv_loop_close(this->Connection->mLoop);
this->Connection->mLoop = nullptr;
}
private:
cmServerConnection* Connection;
};
cmServerConnection::cmServerConnection()
{
}
cmServerConnection::~cmServerConnection()
{
}
void cmServerConnection::SetServer(cmServer* s)
{
this->Server = s;
}
bool cmServerConnection::ProcessEvents(std::string* errorMessage)
{
assert(this->Server);
errorMessage->clear();
this->RawReadBuffer.clear();
this->RequestBuffer.clear();
LoopGuard guard(this);
(void)(guard);
if (!this->mLoop) {
*errorMessage = "Internal Error: Failed to create event loop.";
return false;
}
this->SIGINTHandler = new uv_signal_t;
uv_signal_init(this->mLoop, this->SIGINTHandler);
this->SIGINTHandler->data = static_cast<void*>(this);
uv_signal_start(this->SIGINTHandler, &on_signal, SIGINT);
this->SIGHUPHandler = new uv_signal_t;
uv_signal_init(this->mLoop, this->SIGHUPHandler);
this->SIGHUPHandler->data = static_cast<void*>(this);
uv_signal_start(this->SIGHUPHandler, &on_signal, SIGHUP);
if (!DoSetup(errorMessage)) {
return false;
}
if (uv_run(this->mLoop, UV_RUN_DEFAULT) != 0) {
*errorMessage = "Internal Error: Event loop stopped in unclean state.";
return false;
}
// These need to be cleaned up by now:
assert(!this->ReadStream);
assert(!this->WriteStream);
this->RawReadBuffer.clear();
this->RequestBuffer.clear();
return true;
}
void cmServerConnection::ReadData(const std::string& data)
cmServerPipeConnection::cmServerPipeConnection(const std::string& name)
: cmPipeConnection(name, new cmServerBufferStrategy)
{
this->RawReadBuffer += data;
}
cmServerStdIoConnection::cmServerStdIoConnection()
: cmStdIoConnection(new cmServerBufferStrategy)
{
}
cmConnectionBufferStrategy::~cmConnectionBufferStrategy()
{
}
void cmConnectionBufferStrategy::clear()
{
}
std::string cmServerBufferStrategy::BufferMessage(std::string& RawReadBuffer)
{
for (;;) {
auto needle = this->RawReadBuffer.find('\n');
auto needle = RawReadBuffer.find('\n');
if (needle == std::string::npos) {
return;
return "";
}
std::string line = this->RawReadBuffer.substr(0, needle);
std::string line = RawReadBuffer.substr(0, needle);
const auto ls = line.size();
if (ls > 1 && line.at(ls - 1) == '\r') {
line.erase(ls - 1, 1);
}
this->RawReadBuffer.erase(this->RawReadBuffer.begin(),
this->RawReadBuffer.begin() +
static_cast<long>(needle) + 1);
RawReadBuffer.erase(RawReadBuffer.begin(),
RawReadBuffer.begin() + static_cast<long>(needle) + 1);
if (line == kSTART_MAGIC) {
this->RequestBuffer.clear();
RequestBuffer.clear();
continue;
}
if (line == kEND_MAGIC) {
this->Server->QueueRequest(this->RequestBuffer);
this->RequestBuffer.clear();
} else {
this->RequestBuffer += line;
this->RequestBuffer += "\n";
std::string rtn;
rtn.swap(this->RequestBuffer);
return rtn;
}
this->RequestBuffer += line;
this->RequestBuffer += "\n";
}
}
void cmServerConnection::TriggerShutdown()
{
this->FileMonitor()->StopMonitoring();
uv_signal_stop(this->SIGINTHandler);
uv_signal_stop(this->SIGHUPHandler);
uv_close(reinterpret_cast<uv_handle_t*>(this->SIGINTHandler),
&on_signal_close); // delete handle
uv_close(reinterpret_cast<uv_handle_t*>(this->SIGHUPHandler),
&on_signal_close); // delete handle
this->SIGINTHandler = nullptr;
this->SIGHUPHandler = nullptr;
this->TearDown();
}
void cmServerConnection::WriteData(const std::string& data)
{
assert(this->WriteStream);
auto ds = data.size();
write_req_t* req = new write_req_t;
req->req.data = this;
req->buf = uv_buf_init(new char[ds], static_cast<unsigned int>(ds));
memcpy(req->buf.base, data.c_str(), ds);
uv_write(reinterpret_cast<uv_write_t*>(req),
static_cast<uv_stream_t*>(this->WriteStream), &req->buf, 1,
on_write);
}
void cmServerConnection::ProcessNextRequest()
{
Server->PopOne();
}
void cmServerConnection::SendGreetings()
{
Server->PrintHello();
}
cmServerStdIoConnection::cmServerStdIoConnection()
{
this->Input.tty = nullptr;
this->Output.tty = nullptr;
}
bool cmServerStdIoConnection::DoSetup(std::string* errorMessage)
{
(void)(errorMessage);
if (uv_guess_handle(1) == UV_TTY) {
usesTty = true;
this->Input.tty = new uv_tty_t;
uv_tty_init(this->Loop(), this->Input.tty, 0, 1);
uv_tty_set_mode(this->Input.tty, UV_TTY_MODE_NORMAL);
Input.tty->data = this;
this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.tty);
this->Output.tty = new uv_tty_t;
uv_tty_init(this->Loop(), this->Output.tty, 1, 0);
uv_tty_set_mode(this->Output.tty, UV_TTY_MODE_NORMAL);
Output.tty->data = this;
this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.tty);
} else {
usesTty = false;
this->Input.pipe = new uv_pipe_t;
uv_pipe_init(this->Loop(), this->Input.pipe, 0);
uv_pipe_open(this->Input.pipe, 0);
Input.pipe->data = this;
this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.pipe);
this->Output.pipe = new uv_pipe_t;
uv_pipe_init(this->Loop(), this->Output.pipe, 0);
uv_pipe_open(this->Output.pipe, 1);
Output.pipe->data = this;
this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.pipe);
}
SendGreetings();
uv_read_start(this->ReadStream, on_alloc_buffer, on_read);
return true;
}
void cmServerStdIoConnection::TearDown()
{
if (usesTty) {
uv_close(reinterpret_cast<uv_handle_t*>(this->Input.tty), &on_tty_close);
uv_close(reinterpret_cast<uv_handle_t*>(this->Output.tty), &on_tty_close);
this->Input.tty = nullptr;
this->Output.tty = nullptr;
} else {
uv_close(reinterpret_cast<uv_handle_t*>(this->Input.pipe), &on_pipe_close);
uv_close(reinterpret_cast<uv_handle_t*>(this->Output.pipe),
&on_pipe_close);
this->Input.pipe = nullptr;
this->Input.pipe = nullptr;
}
this->ReadStream = nullptr;
this->WriteStream = nullptr;
}
cmServerPipeConnection::cmServerPipeConnection(const std::string& name)
: PipeName(name)
{
}
bool cmServerPipeConnection::DoSetup(std::string* errorMessage)
{
this->ServerPipe = new uv_pipe_t;
uv_pipe_init(this->Loop(), this->ServerPipe, 0);
this->ServerPipe->data = this;
int r;
if ((r = uv_pipe_bind(this->ServerPipe, this->PipeName.c_str())) != 0) {
*errorMessage = std::string("Internal Error with ") + this->PipeName +
": " + uv_err_name(r);
return false;
}
auto serverStream = reinterpret_cast<uv_stream_t*>(this->ServerPipe);
if ((r = uv_listen(serverStream, 1, on_new_connection)) != 0) {
*errorMessage = std::string("Internal Error listening on ") +
this->PipeName + ": " + uv_err_name(r);
return false;
}
return true;
}
void cmServerPipeConnection::TearDown()
{
if (this->ClientPipe) {
uv_close(reinterpret_cast<uv_handle_t*>(this->ClientPipe), &on_pipe_close);
this->WriteStream->data = nullptr;
}
uv_close(reinterpret_cast<uv_handle_t*>(this->ServerPipe), &on_pipe_close);
this->ClientPipe = nullptr;
this->ServerPipe = nullptr;
this->WriteStream = nullptr;
this->ReadStream = nullptr;
}
void cmServerPipeConnection::Connect(uv_stream_t* server)
{
if (this->ClientPipe) {
// Accept and close all pipes but the first:
uv_pipe_t* rejectPipe = new uv_pipe_t;
uv_pipe_init(this->Loop(), rejectPipe, 0);
auto rejecter = reinterpret_cast<uv_stream_t*>(rejectPipe);
uv_accept(server, rejecter);
uv_close(reinterpret_cast<uv_handle_t*>(rejecter), &on_pipe_close);
return;
}
this->ClientPipe = new uv_pipe_t;
uv_pipe_init(this->Loop(), this->ClientPipe, 0);
this->ClientPipe->data = this;
auto client = reinterpret_cast<uv_stream_t*>(this->ClientPipe);
if (uv_accept(server, client) != 0) {
uv_close(reinterpret_cast<uv_handle_t*>(client), nullptr);
return;
}
this->ReadStream = client;
this->WriteStream = client;
uv_read_start(this->ReadStream, on_alloc_buffer, on_read);
this->SendGreetings();
}

View File

@@ -2,68 +2,46 @@
file Copyright.txt or https://cmake.org/licensing for details. */
#pragma once
#include "cmConfigure.h"
#include "cmConnection.h"
#include "cmPipeConnection.h"
#include "cm_uv.h"
#include <string>
class cmFileMonitor;
class cmServer;
class cmServerBase;
class cmServerConnection
/***
* This connection buffer strategy accepts messages in the form of
* [== "CMake Server" ==[
{
... some JSON message ...
}
]== "CMake Server" ==]
* and only passes on the core json; it discards the envelope.
*/
class cmServerBufferStrategy : public cmConnectionBufferStrategy
{
CM_DISABLE_COPY(cmServerConnection)
public:
cmServerConnection();
virtual ~cmServerConnection();
void SetServer(cmServer* s);
bool ProcessEvents(std::string* errorMessage);
void ReadData(const std::string& data);
void TriggerShutdown();
void WriteData(const std::string& data);
void ProcessNextRequest();
virtual void Connect(uv_stream_t* server) { (void)(server); }
cmFileMonitor* FileMonitor() const { return this->mFileMonitor; }
protected:
virtual bool DoSetup(std::string* errorMessage) = 0;
virtual void TearDown() = 0;
void SendGreetings();
uv_loop_t* Loop() const { return mLoop; }
protected:
std::string RawReadBuffer;
std::string RequestBuffer;
uv_stream_t* ReadStream = nullptr;
uv_stream_t* WriteStream = nullptr;
std::string BufferMessage(std::string& rawBuffer) override;
private:
uv_loop_t* mLoop = nullptr;
cmFileMonitor* mFileMonitor = nullptr;
cmServer* Server = nullptr;
uv_signal_t* SIGINTHandler = nullptr;
uv_signal_t* SIGHUPHandler = nullptr;
friend class LoopGuard;
std::string RequestBuffer;
};
class cmServerStdIoConnection : public cmServerConnection
/***
* Generic connection over std io interfaces -- tty
*/
class cmStdIoConnection : public cmConnection
{
public:
cmServerStdIoConnection();
bool DoSetup(std::string* errorMessage) override;
cmStdIoConnection(cmConnectionBufferStrategy* bufferStrategy);
void TearDown() override;
void SetServer(cmServerBase* s) override;
bool OnServerShuttingDown() override;
bool OnServeStart(std::string* pString) override;
private:
typedef union
@@ -78,18 +56,18 @@ private:
InOutUnion Output;
};
class cmServerPipeConnection : public cmServerConnection
/***
* These specific connections use the cmake server
* buffering strategy.
*/
class cmServerStdIoConnection : public cmStdIoConnection
{
public:
cmServerStdIoConnection();
};
class cmServerPipeConnection : public cmPipeConnection
{
public:
cmServerPipeConnection(const std::string& name);
bool DoSetup(std::string* errorMessage) override;
void TearDown() override;
void Connect(uv_stream_t* server) override;
private:
const std::string PipeName;
uv_pipe_t* ServerPipe = nullptr;
uv_pipe_t* ClientPipe = nullptr;
};

View File

@@ -44,6 +44,8 @@
#include <stdlib.h>
#include <time.h>
class cmConnection;
int cmcmd_cmake_ninja_depends(std::vector<std::string>::const_iterator argBeg,
std::vector<std::string>::const_iterator argEnd);
int cmcmd_cmake_ninja_dyndep(std::vector<std::string>::const_iterator argBeg,
@@ -1013,7 +1015,7 @@ int cmcmd::ExecuteCMakeCommand(std::vector<std::string>& args)
}
}
#if defined(HAVE_SERVER_MODE) && HAVE_SERVER_MODE
cmServerConnection* conn;
cmConnection* conn;
if (isDebug) {
conn = new cmServerStdIoConnection;
} else {

View File

@@ -21,6 +21,7 @@
{ include: [ "<wctype.h>", public, "<cwctype>", public ] },
# HACK: check whether this can be removed with next iwyu release.
{ include: [ "<bits/shared_ptr.h>", private, "<memory>", public ] },
{ include: [ "<bits/std_function.h>", private, "<functional>", public ] },
{ include: [ "<bits/time.h>", private, "<time.h>", public ] },
{ include: [ "<bits/types/clock_t.h>", private, "<time.h>", public ] },