From 811a84df0f16d2201f68f9ff19e28e4131b34291 Mon Sep 17 00:00:00 2001 From: Alexander Bock Date: Mon, 19 Oct 2020 17:48:16 +0200 Subject: [PATCH] Fix issue where Wormhole server's script messages could not be decoded Fix issue where dashboard item for parallel peer would not report line number correctly Closes #1011 --- apps/Wormhole/main.cpp | 57 +++++------ include/openspace/network/messagestructures.h | 26 ++++- include/openspace/network/parallelserver.h | 4 - .../dashboarditemparallelconnection.cpp | 6 +- src/network/parallelserver.cpp | 95 +++++++++---------- 5 files changed, 102 insertions(+), 86 deletions(-) diff --git a/apps/Wormhole/main.cpp b/apps/Wormhole/main.cpp index 12b3764abb..487244aef8 100644 --- a/apps/Wormhole/main.cpp +++ b/apps/Wormhole/main.cpp @@ -44,42 +44,33 @@ int main(int argc, char** argv) { CommandlineParser::AllowUnknownCommands::Yes ); - std::stringstream defaultPassword; - defaultPassword << std::hex << std::setfill('0') << std::setw(6) << - (std::hash{}( - std::chrono::system_clock::now().time_since_epoch().count() - ) % 0xffffff); + struct { + std::string port; + std::string password; + std::string changeHostPassword; + } settings; - std::stringstream defaultChangeHostPassword; - defaultChangeHostPassword << std::hex << std::setfill('0') << std::setw(6) << - (std::hash{}( - std::chrono::system_clock::now().time_since_epoch().count() + 1 - ) % 0xffffff); - - std::string portString; commandlineParser.addCommand( std::make_unique>( - portString, + settings.port, "--port", "-p", "Sets the port to listen on" ) ); - std::string password; commandlineParser.addCommand( std::make_unique>( - password, + settings.password, "--password", "-l", "Sets the password to use" ) ); - std::string changeHostPassword; commandlineParser.addCommand( std::make_unique>( - changeHostPassword, + settings.changeHostPassword, "--hostpassword", "-h", "Sets the host password to use" @@ -89,29 +80,41 @@ int main(int argc, char** argv) { commandlineParser.setCommandLine(arguments); commandlineParser.execute(); - if (password.empty()) { - password = defaultPassword.str(); + if (settings.password.empty()) { + std::stringstream defaultPassword; + defaultPassword << std::hex << std::setfill('0') << std::setw(6) << + (std::hash{}( + std::chrono::system_clock::now().time_since_epoch().count() + ) % 0xffffff); + + settings.password = defaultPassword.str(); } - if (changeHostPassword.empty()) { - changeHostPassword = defaultChangeHostPassword.str(); + if (settings.changeHostPassword.empty()) { + std::stringstream defaultChangeHostPassword; + defaultChangeHostPassword << std::hex << std::setfill('0') << std::setw(6) << + (std::hash{}( + std::chrono::system_clock::now().time_since_epoch().count() + 1 + ) % 0xffffff); + + settings.changeHostPassword = defaultChangeHostPassword.str(); } - LINFO(fmt::format("Connection password: {}", password)); - LINFO(fmt::format("Host password: {}", changeHostPassword)); + LINFO(fmt::format("Connection password: {}", settings.password)); + LINFO(fmt::format("Host password: {}", settings.changeHostPassword)); int port = 25001; - if (!portString.empty()) { + if (!settings.port.empty()) { try { - port = std::stoi(portString); + port = std::stoi(settings.port); } catch (const std::invalid_argument&) { - LERROR(fmt::format("Invalid port: {}", portString)); + LERROR(fmt::format("Invalid port: {}", settings.port)); } } ParallelServer server; - server.start(port, password, changeHostPassword); + server.start(port, settings.password, settings.changeHostPassword); server.setDefaultHostAddress("127.0.0.1"); LINFO(fmt::format("Server listening to port {}", port)); diff --git a/include/openspace/network/messagestructures.h b/include/openspace/network/messagestructures.h index ec27c27dd1..bea328662d 100644 --- a/include/openspace/network/messagestructures.h +++ b/include/openspace/network/messagestructures.h @@ -26,6 +26,8 @@ #define __OPENSPACE_CORE___MESSAGESTRUCTURES___H__ #include +#include +#include #include #include #include @@ -373,17 +375,31 @@ struct ScriptMessage { double _timestamp = 0.0; void serialize(std::vector& buffer) const { - size_t strLen = _script.size(); - size_t writeSize_bytes = sizeof(size_t); + uint32_t strLen = static_cast(_script.size()); - unsigned char const *p = reinterpret_cast(&strLen); - buffer.insert(buffer.end(), p, p + writeSize_bytes); + const char* p = reinterpret_cast(&strLen); + buffer.insert(buffer.end(), p, p + sizeof(uint32_t)); buffer.insert(buffer.end(), _script.begin(), _script.end()); }; void deserialize(const std::vector& buffer) { - _script.assign(buffer.begin(), buffer.end()); + const char* p = buffer.data(); + const uint32_t len = *reinterpret_cast(p); + + if (buffer.size() != (sizeof(uint32_t) + len)) { + LERRORC( + "ParallelPeer", + fmt::format( + "Received buffer with wrong size. Expected {} got {}", + len, buffer.size() + ) + ); + return; + } + + // We can skip over the first uint32_t that encoded the length + _script.assign(buffer.begin() + sizeof(uint32_t), buffer.end()); }; void write(std::ostream* out) const { diff --git a/include/openspace/network/parallelserver.h b/include/openspace/network/parallelserver.h index 18ce60e573..684ac62f2a 100644 --- a/include/openspace/network/parallelserver.h +++ b/include/openspace/network/parallelserver.h @@ -50,9 +50,6 @@ public: private: struct Peer { - //Peer(size_t id_, std::string name_, ParallelConnection parallelConnection_, - //ParallelConnection::Status status_, std::thread ) - size_t id; std::string name; ParallelConnection parallelConnection; @@ -87,7 +84,6 @@ private: void handleData(const Peer& peer, std::vector data); void handleHostshipRequest(std::shared_ptr peer, std::vector message); void handleHostshipResignation(Peer& peer); - void handleDisconnection(std::shared_ptr peer); void handleNewPeers(); void eventLoop(); diff --git a/modules/base/dashboard/dashboarditemparallelconnection.cpp b/modules/base/dashboard/dashboarditemparallelconnection.cpp index a7858f74b7..be39c9bac4 100644 --- a/modules/base/dashboard/dashboarditemparallelconnection.cpp +++ b/modules/base/dashboard/dashboarditemparallelconnection.cpp @@ -121,6 +121,8 @@ void DashboardItemParallelConnection::render(glm::vec2& penPosition) { const size_t nConnections = global::parallelPeer.nConnections(); const std::string& hostName = global::parallelPeer.hostName(); + int nLines = 1; + std::string connectionInfo; int nClients = static_cast(nConnections); if (status == ParallelConnection::Status::Host) { @@ -158,11 +160,13 @@ void DashboardItemParallelConnection::render(glm::vec2& penPosition) { else if (nClients == 1) { connectionInfo += "You are the only client"; } + + nLines = 2; } if (!connectionInfo.empty()) { RenderFont(*_font, penPosition, connectionInfo); - penPosition.y -= _font->height(); + penPosition.y -= _font->height() * nLines; } } diff --git a/src/network/parallelserver.cpp b/src/network/parallelserver.cpp index 499aa642f8..a52caeb410 100644 --- a/src/network/parallelserver.cpp +++ b/src/network/parallelserver.cpp @@ -50,12 +50,12 @@ void ParallelServer::start(int port, const std::string& password, } void ParallelServer::setDefaultHostAddress(std::string defaultHostAddress) { - std::lock_guard lock(_hostInfoMutex); + std::lock_guard lock(_hostInfoMutex); _defaultHostAddress = std::move(defaultHostAddress); } std::string ParallelServer::defaultHostAddress() const { - std::lock_guard lock(_hostInfoMutex); + std::lock_guard lock(_hostInfoMutex); return _defaultHostAddress; } @@ -66,29 +66,26 @@ void ParallelServer::stop() { void ParallelServer::handleNewPeers() { while (!_shouldStop) { - std::unique_ptr socket = - _socketServer.awaitPendingTcpSocket(); + std::unique_ptr s = _socketServer.awaitPendingTcpSocket(); - socket->startStreams(); + s->startStreams(); const size_t id = _nextConnectionId++; std::shared_ptr p = std::make_shared(Peer{ id, "", - ParallelConnection(std::move(socket)), + ParallelConnection(std::move(s)), ParallelConnection::Status::Connecting, std::thread() }); auto it = _peers.emplace(p->id, p); - it.first->second->thread = std::thread([this, id]() { - handlePeer(id); - }); + it.first->second->thread = std::thread([this, id]() { handlePeer(id); }); } } std::shared_ptr ParallelServer::peer(size_t id) { - std::lock_guard lock(_peerListMutex); - auto it = _peers.find(id); + std::lock_guard lock(_peerListMutex); + const auto it = _peers.find(id); if (it == _peers.end()) { return nullptr; } @@ -107,15 +104,19 @@ void ParallelServer::handlePeer(size_t id) { } try { ParallelConnection::Message m = p->parallelConnection.receiveMessage(); - _incomingMessages.push({id, m}); - } catch (const ParallelConnection::ConnectionLostError&) { + PeerMessage msg; + msg.peerId = id; + msg.message = m; + _incomingMessages.push(msg); + } + catch (const ParallelConnection::ConnectionLostError&) { LERROR(fmt::format("Connection lost to {}", p->id)); - _incomingMessages.push({ - id, - ParallelConnection::Message( - ParallelConnection::MessageType::Disconnection, std::vector() - ) - }); + PeerMessage msg; + msg.peerId = id; + msg.message = ParallelConnection::Message( + ParallelConnection::MessageType::Disconnection, std::vector() + ); + _incomingMessages.push(msg); return; } } @@ -137,9 +138,9 @@ void ParallelServer::handlePeerMessage(PeerMessage peerMessage) { std::shared_ptr& peer = it->second; - const ParallelConnection::MessageType messageType = peerMessage.message.type; + const ParallelConnection::MessageType type = peerMessage.message.type; std::vector& data = peerMessage.message.content; - switch (messageType) { + switch (type) { case ParallelConnection::MessageType::Authentication: handleAuthentication(peer, std::move(data)); break; @@ -156,9 +157,7 @@ void ParallelServer::handlePeerMessage(PeerMessage peerMessage) { disconnect(*peer); break; default: - LERROR(fmt::format( - "Unsupported message type: {}", static_cast(messageType) - )); + LERROR(fmt::format("Unsupported message type: {}", static_cast(type))); break; } } @@ -192,19 +191,19 @@ void ParallelServer::handleAuthentication(std::shared_ptr peer, setName(*peer, name); - LINFO(fmt::format("Connection established with {} \"{}\"", peer->id, name)); + LINFO(fmt::format("Connection established with {} ('{}')", peer->id, name)); std::string defaultHostAddress; { - std::lock_guard _hostMutex(_hostInfoMutex); + std::lock_guard _hostMutex(_hostInfoMutex); defaultHostAddress = _defaultHostAddress; } if (_hostPeerId == 0 && peer->parallelConnection.socket()->address() == defaultHostAddress) { - // Directly promote the conenction to host (initialize) - // if there is no host, and ip matches default host ip. - LINFO(fmt::format("Connection {} directly promoted to host.", peer->id)); + // Directly promote the conenction to host (initialize) if there is no host, and + // ip matches default host ip. + LINFO(fmt::format("Connection {} directly promoted to host", peer->id)); assignHost(peer); for (std::pair>& it : _peers) { // sendConnectionStatus(it->second) ? @@ -221,11 +220,10 @@ void ParallelServer::handleAuthentication(std::shared_ptr peer, void ParallelServer::handleData(const Peer& peer, std::vector data) { if (peer.id != _hostPeerId) { LINFO(fmt::format( - "Connection {} tried to send data without being the host. Ignoring", peer.id + "Ignoring connection {} trying to send data without being host", peer.id )); } sendMessageToClients(ParallelConnection::MessageType::Data, data); - } void ParallelServer::handleHostshipRequest(std::shared_ptr peer, @@ -233,43 +231,43 @@ void ParallelServer::handleHostshipRequest(std::shared_ptr peer, { std::stringstream input(std::string(message.begin(), message.end())); - LINFO(fmt::format("Connection {} requested hostship.", peer->id)); + LINFO(fmt::format("Connection {} requested hostship", peer->id)); uint64_t passwordHash = 0; input.read(reinterpret_cast(&passwordHash), sizeof(uint64_t)); if (passwordHash != _changeHostPasswordHash) { - LERROR(fmt::format("Connection {} provided incorrect host password.", peer->id)); + LERROR(fmt::format("Connection {} provided incorrect host password", peer->id)); return; } size_t oldHostPeerId = 0; { - std::lock_guard lock(_hostInfoMutex); + std::lock_guard lock(_hostInfoMutex); oldHostPeerId = _hostPeerId; } if (oldHostPeerId == peer->id) { - LINFO(fmt::format("Connection {} is already the host.", peer->id)); + LINFO(fmt::format("Connection {} is already the host", peer->id)); return; } assignHost(peer); - LINFO(fmt::format("Switched host from {} to {}.", oldHostPeerId, peer->id)); + LINFO(fmt::format("Switched host from {} to {}", oldHostPeerId, peer->id)); } void ParallelServer::handleHostshipResignation(Peer& peer) { - LINFO(fmt::format("Connection {} wants to resign its hostship.", peer.id)); + LINFO(fmt::format("Connection {} wants to resign its hostship", peer.id)); size_t oldHostPeerId = 0; { - std::lock_guard lock(_hostInfoMutex); + std::lock_guard lock(_hostInfoMutex); oldHostPeerId = _hostPeerId; } setToClient(peer); - LINFO(fmt::format("Connection {} resigned as host.", peer.id)); + LINFO(fmt::format("Connection {} resigned as host", peer.id)); } bool ParallelServer::isConnected(const Peer& peer) const { @@ -277,8 +275,7 @@ bool ParallelServer::isConnected(const Peer& peer) const { peer.status != ParallelConnection::Status::Disconnected; } -void ParallelServer::sendMessage(Peer& peer, - ParallelConnection::MessageType messageType, +void ParallelServer::sendMessage(Peer& peer, ParallelConnection::MessageType messageType, const std::vector& message) { peer.parallelConnection.sendMessage({ messageType, message }); @@ -311,12 +308,12 @@ void ParallelServer::disconnect(Peer& peer) { size_t hostPeerId = 0; { - std::lock_guard lock(_hostInfoMutex); + std::lock_guard lock(_hostInfoMutex); hostPeerId = _hostPeerId; } - // Make sure any disconnecting host is first degraded to client, - // in order to notify other clients about host disconnection. + // Make sure any disconnecting host is first degraded to client, in order to notify + // other clients about host disconnection. if (peer.id == hostPeerId) { setToClient(peer); } @@ -327,7 +324,7 @@ void ParallelServer::disconnect(Peer& peer) { } void ParallelServer::setName(Peer& peer, std::string name) { - peer.name = name; + peer.name = std::move(name); size_t hostPeerId = 0; { std::lock_guard lock(_hostInfoMutex); @@ -338,7 +335,7 @@ void ParallelServer::setName(Peer& peer, std::string name) { if (peer.id == hostPeerId) { { std::lock_guard lock(_hostInfoMutex); - _hostName = name; + _hostName = peer.name; } for (std::pair>& it : _peers) { @@ -374,7 +371,7 @@ void ParallelServer::setToClient(Peer& peer) { { std::lock_guard lock(_hostInfoMutex); _hostPeerId = 0; - _hostName = ""; + _hostName.clear(); } // If host becomes client, make all clients hostless. @@ -421,8 +418,8 @@ void ParallelServer::sendConnectionStatus(Peer& peer) { data.insert( data.end(), - reinterpret_cast(_hostName.data()), - reinterpret_cast(_hostName.data() + outHostNameSize) + _hostName.data(), + _hostName.data() + outHostNameSize ); sendMessage(peer, ParallelConnection::MessageType::ConnectionStatus, data);