Fix issue where Wormhole server's script messages could not be decoded

Fix issue where dashboard item for parallel peer would not report line number correctly
Closes #1011
This commit is contained in:
Alexander Bock
2020-10-19 17:48:16 +02:00
parent 09e0a55efb
commit 811a84df0f
5 changed files with 102 additions and 86 deletions

View File

@@ -44,42 +44,33 @@ int main(int argc, char** argv) {
CommandlineParser::AllowUnknownCommands::Yes
);
std::stringstream defaultPassword;
defaultPassword << std::hex << std::setfill('0') << std::setw(6) <<
(std::hash<size_t>{}(
std::chrono::system_clock::now().time_since_epoch().count()
) % 0xffffff);
struct {
std::string port;
std::string password;
std::string changeHostPassword;
} settings;
std::stringstream defaultChangeHostPassword;
defaultChangeHostPassword << std::hex << std::setfill('0') << std::setw(6) <<
(std::hash<size_t>{}(
std::chrono::system_clock::now().time_since_epoch().count() + 1
) % 0xffffff);
std::string portString;
commandlineParser.addCommand(
std::make_unique<ghoul::cmdparser::SingleCommand<std::string>>(
portString,
settings.port,
"--port",
"-p",
"Sets the port to listen on"
)
);
std::string password;
commandlineParser.addCommand(
std::make_unique<ghoul::cmdparser::SingleCommand<std::string>>(
password,
settings.password,
"--password",
"-l",
"Sets the password to use"
)
);
std::string changeHostPassword;
commandlineParser.addCommand(
std::make_unique<ghoul::cmdparser::SingleCommand<std::string>>(
changeHostPassword,
settings.changeHostPassword,
"--hostpassword",
"-h",
"Sets the host password to use"
@@ -89,29 +80,41 @@ int main(int argc, char** argv) {
commandlineParser.setCommandLine(arguments);
commandlineParser.execute();
if (password.empty()) {
password = defaultPassword.str();
if (settings.password.empty()) {
std::stringstream defaultPassword;
defaultPassword << std::hex << std::setfill('0') << std::setw(6) <<
(std::hash<size_t>{}(
std::chrono::system_clock::now().time_since_epoch().count()
) % 0xffffff);
settings.password = defaultPassword.str();
}
if (changeHostPassword.empty()) {
changeHostPassword = defaultChangeHostPassword.str();
if (settings.changeHostPassword.empty()) {
std::stringstream defaultChangeHostPassword;
defaultChangeHostPassword << std::hex << std::setfill('0') << std::setw(6) <<
(std::hash<size_t>{}(
std::chrono::system_clock::now().time_since_epoch().count() + 1
) % 0xffffff);
settings.changeHostPassword = defaultChangeHostPassword.str();
}
LINFO(fmt::format("Connection password: {}", password));
LINFO(fmt::format("Host password: {}", changeHostPassword));
LINFO(fmt::format("Connection password: {}", settings.password));
LINFO(fmt::format("Host password: {}", settings.changeHostPassword));
int port = 25001;
if (!portString.empty()) {
if (!settings.port.empty()) {
try {
port = std::stoi(portString);
port = std::stoi(settings.port);
}
catch (const std::invalid_argument&) {
LERROR(fmt::format("Invalid port: {}", portString));
LERROR(fmt::format("Invalid port: {}", settings.port));
}
}
ParallelServer server;
server.start(port, password, changeHostPassword);
server.start(port, settings.password, settings.changeHostPassword);
server.setDefaultHostAddress("127.0.0.1");
LINFO(fmt::format("Server listening to port {}", port));

View File

@@ -26,6 +26,8 @@
#define __OPENSPACE_CORE___MESSAGESTRUCTURES___H__
#include <ghoul/glm.h>
#include <ghoul/logging/logmanager.h>
#include <fmt/format.h>
#include <algorithm>
#include <cstring>
#include <fstream>
@@ -373,17 +375,31 @@ struct ScriptMessage {
double _timestamp = 0.0;
void serialize(std::vector<char>& buffer) const {
size_t strLen = _script.size();
size_t writeSize_bytes = sizeof(size_t);
uint32_t strLen = static_cast<uint32_t>(_script.size());
unsigned char const *p = reinterpret_cast<unsigned char const*>(&strLen);
buffer.insert(buffer.end(), p, p + writeSize_bytes);
const char* p = reinterpret_cast<const char*>(&strLen);
buffer.insert(buffer.end(), p, p + sizeof(uint32_t));
buffer.insert(buffer.end(), _script.begin(), _script.end());
};
void deserialize(const std::vector<char>& buffer) {
_script.assign(buffer.begin(), buffer.end());
const char* p = buffer.data();
const uint32_t len = *reinterpret_cast<const uint32_t*>(p);
if (buffer.size() != (sizeof(uint32_t) + len)) {
LERRORC(
"ParallelPeer",
fmt::format(
"Received buffer with wrong size. Expected {} got {}",
len, buffer.size()
)
);
return;
}
// We can skip over the first uint32_t that encoded the length
_script.assign(buffer.begin() + sizeof(uint32_t), buffer.end());
};
void write(std::ostream* out) const {

View File

@@ -50,9 +50,6 @@ public:
private:
struct Peer {
//Peer(size_t id_, std::string name_, ParallelConnection parallelConnection_,
//ParallelConnection::Status status_, std::thread )
size_t id;
std::string name;
ParallelConnection parallelConnection;
@@ -87,7 +84,6 @@ private:
void handleData(const Peer& peer, std::vector<char> data);
void handleHostshipRequest(std::shared_ptr<Peer> peer, std::vector<char> message);
void handleHostshipResignation(Peer& peer);
void handleDisconnection(std::shared_ptr<Peer> peer);
void handleNewPeers();
void eventLoop();

View File

@@ -121,6 +121,8 @@ void DashboardItemParallelConnection::render(glm::vec2& penPosition) {
const size_t nConnections = global::parallelPeer.nConnections();
const std::string& hostName = global::parallelPeer.hostName();
int nLines = 1;
std::string connectionInfo;
int nClients = static_cast<int>(nConnections);
if (status == ParallelConnection::Status::Host) {
@@ -158,11 +160,13 @@ void DashboardItemParallelConnection::render(glm::vec2& penPosition) {
else if (nClients == 1) {
connectionInfo += "You are the only client";
}
nLines = 2;
}
if (!connectionInfo.empty()) {
RenderFont(*_font, penPosition, connectionInfo);
penPosition.y -= _font->height();
penPosition.y -= _font->height() * nLines;
}
}

View File

@@ -50,12 +50,12 @@ void ParallelServer::start(int port, const std::string& password,
}
void ParallelServer::setDefaultHostAddress(std::string defaultHostAddress) {
std::lock_guard<std::mutex> lock(_hostInfoMutex);
std::lock_guard lock(_hostInfoMutex);
_defaultHostAddress = std::move(defaultHostAddress);
}
std::string ParallelServer::defaultHostAddress() const {
std::lock_guard<std::mutex> lock(_hostInfoMutex);
std::lock_guard lock(_hostInfoMutex);
return _defaultHostAddress;
}
@@ -66,29 +66,26 @@ void ParallelServer::stop() {
void ParallelServer::handleNewPeers() {
while (!_shouldStop) {
std::unique_ptr<ghoul::io::TcpSocket> socket =
_socketServer.awaitPendingTcpSocket();
std::unique_ptr<ghoul::io::TcpSocket> s = _socketServer.awaitPendingTcpSocket();
socket->startStreams();
s->startStreams();
const size_t id = _nextConnectionId++;
std::shared_ptr<Peer> p = std::make_shared<Peer>(Peer{
id,
"",
ParallelConnection(std::move(socket)),
ParallelConnection(std::move(s)),
ParallelConnection::Status::Connecting,
std::thread()
});
auto it = _peers.emplace(p->id, p);
it.first->second->thread = std::thread([this, id]() {
handlePeer(id);
});
it.first->second->thread = std::thread([this, id]() { handlePeer(id); });
}
}
std::shared_ptr<ParallelServer::Peer> ParallelServer::peer(size_t id) {
std::lock_guard<std::mutex> lock(_peerListMutex);
auto it = _peers.find(id);
std::lock_guard lock(_peerListMutex);
const auto it = _peers.find(id);
if (it == _peers.end()) {
return nullptr;
}
@@ -107,15 +104,19 @@ void ParallelServer::handlePeer(size_t id) {
}
try {
ParallelConnection::Message m = p->parallelConnection.receiveMessage();
_incomingMessages.push({id, m});
} catch (const ParallelConnection::ConnectionLostError&) {
PeerMessage msg;
msg.peerId = id;
msg.message = m;
_incomingMessages.push(msg);
}
catch (const ParallelConnection::ConnectionLostError&) {
LERROR(fmt::format("Connection lost to {}", p->id));
_incomingMessages.push({
id,
ParallelConnection::Message(
ParallelConnection::MessageType::Disconnection, std::vector<char>()
)
});
PeerMessage msg;
msg.peerId = id;
msg.message = ParallelConnection::Message(
ParallelConnection::MessageType::Disconnection, std::vector<char>()
);
_incomingMessages.push(msg);
return;
}
}
@@ -137,9 +138,9 @@ void ParallelServer::handlePeerMessage(PeerMessage peerMessage) {
std::shared_ptr<Peer>& peer = it->second;
const ParallelConnection::MessageType messageType = peerMessage.message.type;
const ParallelConnection::MessageType type = peerMessage.message.type;
std::vector<char>& data = peerMessage.message.content;
switch (messageType) {
switch (type) {
case ParallelConnection::MessageType::Authentication:
handleAuthentication(peer, std::move(data));
break;
@@ -156,9 +157,7 @@ void ParallelServer::handlePeerMessage(PeerMessage peerMessage) {
disconnect(*peer);
break;
default:
LERROR(fmt::format(
"Unsupported message type: {}", static_cast<int>(messageType)
));
LERROR(fmt::format("Unsupported message type: {}", static_cast<int>(type)));
break;
}
}
@@ -192,19 +191,19 @@ void ParallelServer::handleAuthentication(std::shared_ptr<Peer> peer,
setName(*peer, name);
LINFO(fmt::format("Connection established with {} \"{}\"", peer->id, name));
LINFO(fmt::format("Connection established with {} ('{}')", peer->id, name));
std::string defaultHostAddress;
{
std::lock_guard<std::mutex> _hostMutex(_hostInfoMutex);
std::lock_guard _hostMutex(_hostInfoMutex);
defaultHostAddress = _defaultHostAddress;
}
if (_hostPeerId == 0 &&
peer->parallelConnection.socket()->address() == defaultHostAddress)
{
// Directly promote the conenction to host (initialize)
// if there is no host, and ip matches default host ip.
LINFO(fmt::format("Connection {} directly promoted to host.", peer->id));
// Directly promote the conenction to host (initialize) if there is no host, and
// ip matches default host ip.
LINFO(fmt::format("Connection {} directly promoted to host", peer->id));
assignHost(peer);
for (std::pair<const size_t, std::shared_ptr<Peer>>& it : _peers) {
// sendConnectionStatus(it->second) ?
@@ -221,11 +220,10 @@ void ParallelServer::handleAuthentication(std::shared_ptr<Peer> peer,
void ParallelServer::handleData(const Peer& peer, std::vector<char> data) {
if (peer.id != _hostPeerId) {
LINFO(fmt::format(
"Connection {} tried to send data without being the host. Ignoring", peer.id
"Ignoring connection {} trying to send data without being host", peer.id
));
}
sendMessageToClients(ParallelConnection::MessageType::Data, data);
}
void ParallelServer::handleHostshipRequest(std::shared_ptr<Peer> peer,
@@ -233,43 +231,43 @@ void ParallelServer::handleHostshipRequest(std::shared_ptr<Peer> peer,
{
std::stringstream input(std::string(message.begin(), message.end()));
LINFO(fmt::format("Connection {} requested hostship.", peer->id));
LINFO(fmt::format("Connection {} requested hostship", peer->id));
uint64_t passwordHash = 0;
input.read(reinterpret_cast<char*>(&passwordHash), sizeof(uint64_t));
if (passwordHash != _changeHostPasswordHash) {
LERROR(fmt::format("Connection {} provided incorrect host password.", peer->id));
LERROR(fmt::format("Connection {} provided incorrect host password", peer->id));
return;
}
size_t oldHostPeerId = 0;
{
std::lock_guard<std::mutex> lock(_hostInfoMutex);
std::lock_guard lock(_hostInfoMutex);
oldHostPeerId = _hostPeerId;
}
if (oldHostPeerId == peer->id) {
LINFO(fmt::format("Connection {} is already the host.", peer->id));
LINFO(fmt::format("Connection {} is already the host", peer->id));
return;
}
assignHost(peer);
LINFO(fmt::format("Switched host from {} to {}.", oldHostPeerId, peer->id));
LINFO(fmt::format("Switched host from {} to {}", oldHostPeerId, peer->id));
}
void ParallelServer::handleHostshipResignation(Peer& peer) {
LINFO(fmt::format("Connection {} wants to resign its hostship.", peer.id));
LINFO(fmt::format("Connection {} wants to resign its hostship", peer.id));
size_t oldHostPeerId = 0;
{
std::lock_guard<std::mutex> lock(_hostInfoMutex);
std::lock_guard lock(_hostInfoMutex);
oldHostPeerId = _hostPeerId;
}
setToClient(peer);
LINFO(fmt::format("Connection {} resigned as host.", peer.id));
LINFO(fmt::format("Connection {} resigned as host", peer.id));
}
bool ParallelServer::isConnected(const Peer& peer) const {
@@ -277,8 +275,7 @@ bool ParallelServer::isConnected(const Peer& peer) const {
peer.status != ParallelConnection::Status::Disconnected;
}
void ParallelServer::sendMessage(Peer& peer,
ParallelConnection::MessageType messageType,
void ParallelServer::sendMessage(Peer& peer, ParallelConnection::MessageType messageType,
const std::vector<char>& message)
{
peer.parallelConnection.sendMessage({ messageType, message });
@@ -311,12 +308,12 @@ void ParallelServer::disconnect(Peer& peer) {
size_t hostPeerId = 0;
{
std::lock_guard<std::mutex> lock(_hostInfoMutex);
std::lock_guard lock(_hostInfoMutex);
hostPeerId = _hostPeerId;
}
// Make sure any disconnecting host is first degraded to client,
// in order to notify other clients about host disconnection.
// Make sure any disconnecting host is first degraded to client, in order to notify
// other clients about host disconnection.
if (peer.id == hostPeerId) {
setToClient(peer);
}
@@ -327,7 +324,7 @@ void ParallelServer::disconnect(Peer& peer) {
}
void ParallelServer::setName(Peer& peer, std::string name) {
peer.name = name;
peer.name = std::move(name);
size_t hostPeerId = 0;
{
std::lock_guard lock(_hostInfoMutex);
@@ -338,7 +335,7 @@ void ParallelServer::setName(Peer& peer, std::string name) {
if (peer.id == hostPeerId) {
{
std::lock_guard lock(_hostInfoMutex);
_hostName = name;
_hostName = peer.name;
}
for (std::pair<const size_t, std::shared_ptr<Peer>>& it : _peers) {
@@ -374,7 +371,7 @@ void ParallelServer::setToClient(Peer& peer) {
{
std::lock_guard lock(_hostInfoMutex);
_hostPeerId = 0;
_hostName = "";
_hostName.clear();
}
// If host becomes client, make all clients hostless.
@@ -421,8 +418,8 @@ void ParallelServer::sendConnectionStatus(Peer& peer) {
data.insert(
data.end(),
reinterpret_cast<const char*>(_hostName.data()),
reinterpret_cast<const char*>(_hostName.data() + outHostNameSize)
_hostName.data(),
_hostName.data() + outHostNameSize
);
sendMessage(peer, ParallelConnection::MessageType::ConnectionStatus, data);