diff --git a/apps/Wormhole/CMakeLists.txt b/apps/Wormhole/CMakeLists.txt deleted file mode 100644 index f2ddb1915f..0000000000 --- a/apps/Wormhole/CMakeLists.txt +++ /dev/null @@ -1,64 +0,0 @@ -########################################################################################## -# # -# OpenSpace # -# # -# Copyright (c) 2014-2022 # -# # -# Permission is hereby granted, free of charge, to any person obtaining a copy of this # -# software and associated documentation files (the "Software"), to deal in the Software # -# without restriction, including without limitation the rights to use, copy, modify, # -# merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # -# permit persons to whom the Software is furnished to do so, subject to the following # -# conditions: # -# # -# The above copyright notice and this permission notice shall be included in all copies # -# or substantial portions of the Software. # -# # -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # -# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # -# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # -# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF # -# CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE # -# OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # -########################################################################################## - -include(${OPENSPACE_CMAKE_EXT_DIR}/application_definition.cmake) - -set_source_files_properties( - ${CMAKE_CURRENT_SOURCE_DIR}/openspace.icns - PROPERTIES MACOSX_PACKAGE_LOCATION "Resources" -) - -set(MACOSX_BUNDLE_ICON_FILE openspace.icns) - -create_new_application( - Wormhole MACOSX_BUNDLE - ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/openspace.rc - ${CMAKE_CURRENT_SOURCE_DIR}/openspace.icns -) - -target_link_libraries(Wormhole PRIVATE openspace-core openspace-module-collection) - -# Web Browser and Web gui -# Why not put these in the module's path? Because they do not have access to the -# target as of July 2017, which is needed. -if (OPENSPACE_MODULE_WEBBROWSER AND CEF_ROOT) - # wanted by CEF - set(CMAKE_BUILD_TYPE Debug CACHE STRING "CMAKE_BUILD_TYPE") - set(PROJECT_ARCH "x86_64") - - if (WIN32) - set(RESOURCE_FILE ${OPENSPACE_APPS_DIR}/OpenSpace/openspace.rc) - endif () - - # Add the CEF binary distribution's cmake/ directory to the module path and - # find CEF to initialize it properly. - set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${WEBBROWSER_MODULE_PATH}/cmake") - include(webbrowser_helpers) - - set_cef_targets("${CEF_ROOT}" Wormhole) - run_cef_platform_config("${CEF_ROOT}" "${CEF_TARGET}" "${WEBBROWSER_MODULE_PATH}") -elseif (OPENSPACE_MODULE_WEBBROWSER) - message(WARNING "Web configured to be included, but no CEF_ROOT was found, please try configuring CMake again.") -endif () diff --git a/apps/Wormhole/include.cmake b/apps/Wormhole/include.cmake deleted file mode 100644 index 7ac26b6f4f..0000000000 --- a/apps/Wormhole/include.cmake +++ /dev/null @@ -1 +0,0 @@ -set(DEFAULT_APPLICATION ON) diff --git a/apps/Wormhole/main.cpp b/apps/Wormhole/main.cpp deleted file mode 100644 index 2ac955bbe7..0000000000 --- a/apps/Wormhole/main.cpp +++ /dev/null @@ -1,133 +0,0 @@ -/***************************************************************************************** - * * - * OpenSpace * - * * - * Copyright (c) 2014-2022 * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this * - * software and associated documentation files (the "Software"), to deal in the Software * - * without restriction, including without limitation the rights to use, copy, modify, * - * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * - * permit persons to whom the Software is furnished to do so, subject to the following * - * conditions: * - * * - * The above copyright notice and this permission notice shall be included in all copies * - * or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * - * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * - * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * - * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * - ****************************************************************************************/ - -#include -#include -#include -#include -#include -#include - -namespace { - constexpr const char*_loggerCat = "Wormhole"; -} // namespace - -int main(int argc, char** argv) { - using namespace openspace; - using namespace ghoul::cmdparser; - - std::vector arguments(argv, argv + argc); - - CommandlineParser commandlineParser( - "Wormhole", - CommandlineParser::AllowUnknownCommands::Yes - ); - - struct { - std::string port; - std::string password; - std::string changeHostPassword; - } settings; - - commandlineParser.addCommand( - std::make_unique>( - settings.port, - "--port", - "-p", - "Sets the port to listen on" - ) - ); - - commandlineParser.addCommand( - std::make_unique>( - settings.password, - "--password", - "-l", - "Sets the password to use" - ) - ); - - commandlineParser.addCommand( - std::make_unique>( - settings.changeHostPassword, - "--hostpassword", - "-h", - "Sets the host password to use" - ) - ); - - ghoul::logging::LogManager::initialize( - ghoul::logging::LogLevel::Debug, - ghoul::logging::LogManager::ImmediateFlush::Yes - ); - - commandlineParser.setCommandLine(arguments); - commandlineParser.execute(); - - if (settings.password.empty()) { - std::stringstream defaultPassword; - defaultPassword << std::hex << std::setfill('0') << std::setw(6) << - (std::hash{}( - std::chrono::system_clock::now().time_since_epoch().count() - ) % 0xffffff); - - settings.password = defaultPassword.str(); - } - if (settings.changeHostPassword.empty()) { - std::stringstream defaultChangeHostPassword; - defaultChangeHostPassword << std::hex << std::setfill('0') << std::setw(6) << - (std::hash{}( - std::chrono::system_clock::now().time_since_epoch().count() + 1 - ) % 0xffffff); - - settings.changeHostPassword = defaultChangeHostPassword.str(); - } - LINFO(fmt::format("Connection password: {}", settings.password)); - LINFO(fmt::format("Host password: {}", settings.changeHostPassword)); - - int port = 25001; - - if (!settings.port.empty()) { - try { - port = std::stoi(settings.port); - } - catch (const std::invalid_argument&) { - LERROR(fmt::format("Invalid port: {}", settings.port)); - } - } - - ParallelServer server; - server.start(port, settings.password, settings.changeHostPassword); - server.setDefaultHostAddress("127.0.0.1"); - LINFO(fmt::format("Server listening to port {}", port)); - - while (std::cin.get() != 'q') { - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - } - - server.stop(); - LINFO("Server stopped"); - - return 0; -} diff --git a/apps/Wormhole/openspace.icns b/apps/Wormhole/openspace.icns deleted file mode 100644 index a6e08b0a54..0000000000 Binary files a/apps/Wormhole/openspace.icns and /dev/null differ diff --git a/apps/Wormhole/openspace.ico b/apps/Wormhole/openspace.ico deleted file mode 100644 index 38f7eb88a8..0000000000 Binary files a/apps/Wormhole/openspace.ico and /dev/null differ diff --git a/apps/Wormhole/openspace.png b/apps/Wormhole/openspace.png deleted file mode 100644 index 1e5f26b087..0000000000 Binary files a/apps/Wormhole/openspace.png and /dev/null differ diff --git a/apps/Wormhole/openspace.rc b/apps/Wormhole/openspace.rc deleted file mode 100644 index 86a250bb5d..0000000000 --- a/apps/Wormhole/openspace.rc +++ /dev/null @@ -1 +0,0 @@ -IDI_ICON1 ICON DISCARDABLE "openspace.ico" \ No newline at end of file diff --git a/include/openspace/network/messagestructures.h b/include/openspace/network/messagestructures.h index b6bd445bbe..f53a1e37df 100644 --- a/include/openspace/network/messagestructures.h +++ b/include/openspace/network/messagestructures.h @@ -89,13 +89,13 @@ struct CameraKeyframe { sizeof(_followNodeRotation) ); - int nodeNameLength = static_cast(_focusNode.size()); + uint32_t nodeNameLength = static_cast(_focusNode.size()); // Add focus node buffer.insert( buffer.end(), reinterpret_cast(&nodeNameLength), - reinterpret_cast(&nodeNameLength) + sizeof(nodeNameLength) + reinterpret_cast(&nodeNameLength) + sizeof(uint32_t) ); buffer.insert( buffer.end(), diff --git a/include/openspace/network/parallelconnection.h b/include/openspace/network/parallelconnection.h index e180d89a8d..362e888f25 100644 --- a/include/openspace/network/parallelconnection.h +++ b/include/openspace/network/parallelconnection.h @@ -42,14 +42,13 @@ public: Host }; - enum class MessageType : uint32_t { + enum class MessageType : uint8_t { Authentication = 0, Data, ConnectionStatus, HostshipRequest, HostshipResignation, - NConnections, - Disconnection + NConnections }; struct Message { @@ -71,7 +70,9 @@ public: class ConnectionLostError : public ghoul::RuntimeError { public: - explicit ConnectionLostError(); + explicit ConnectionLostError(bool shouldLogError = true); + + bool shouldLogError; }; ParallelConnection(std::unique_ptr socket); @@ -84,9 +85,11 @@ public: ParallelConnection::Message receiveMessage(); - static const unsigned int ProtocolVersion; + static const uint8_t ProtocolVersion; + private: std::unique_ptr _socket; + bool _shouldDisconnect = false; }; } // namespace openspace diff --git a/include/openspace/network/parallelserver.h b/include/openspace/network/parallelserver.h deleted file mode 100644 index 51b64c012f..0000000000 --- a/include/openspace/network/parallelserver.h +++ /dev/null @@ -1,117 +0,0 @@ -/***************************************************************************************** - * * - * OpenSpace * - * * - * Copyright (c) 2014-2022 * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this * - * software and associated documentation files (the "Software"), to deal in the Software * - * without restriction, including without limitation the rights to use, copy, modify, * - * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * - * permit persons to whom the Software is furnished to do so, subject to the following * - * conditions: * - * * - * The above copyright notice and this permission notice shall be included in all copies * - * or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * - * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * - * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * - * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * - ****************************************************************************************/ - -#ifndef __OPENSPACE_CORE___PARALLELSERVER___H__ -#define __OPENSPACE_CORE___PARALLELSERVER___H__ - -#include - -#include -#include -#include -#include -#include - -namespace openspace { - -class ParallelServer { -public: - void start(int port, const std::string& password, - const std::string& changeHostPassword); - - void setDefaultHostAddress(std::string defaultHostAddress); - - std::string defaultHostAddress() const; - - void stop(); - - size_t nConnections() const; - -private: - struct Peer { - size_t id; - std::string name; - ParallelConnection parallelConnection; - ParallelConnection::Status status; - std::thread thread; - }; - - struct PeerMessage { - size_t peerId; - ParallelConnection::Message message; - }; - - bool isConnected(const Peer& peer) const; - - void sendMessage(Peer& peer, ParallelConnection::MessageType messageType, - const std::vector& message); - - void sendMessageToAll(ParallelConnection::MessageType messageType, - const std::vector& message); - - void sendMessageToClients(ParallelConnection::MessageType messageType, - const std::vector& message); - - void disconnect(Peer& peer); - void setName(Peer& peer, std::string name); - void assignHost(std::shared_ptr newHost); - void setToClient(Peer& peer); - void setNConnections(size_t nConnections); - void sendConnectionStatus(Peer& peer); - - void handleAuthentication(std::shared_ptr peer, std::vector message); - void handleData(const Peer& peer, std::vector data); - void handleHostshipRequest(std::shared_ptr peer, std::vector message); - void handleHostshipResignation(Peer& peer); - - void handleNewPeers(); - void eventLoop(); - std::shared_ptr peer(size_t id); - void handlePeer(size_t id); - void handlePeerMessage(PeerMessage peerMessage); - - std::unordered_map> _peers; - mutable std::mutex _peerListMutex; - - std::thread _serverThread; - std::thread _eventLoopThread; - ghoul::io::TcpSocketServer _socketServer; - size_t _passwordHash; - size_t _changeHostPasswordHash; - size_t _nextConnectionId = 1; - std::atomic_bool _shouldStop = false; - - std::atomic_size_t _nConnections = 0; - std::atomic_size_t _hostPeerId = 0; - - mutable std::mutex _hostInfoMutex; - std::string _hostName; - std::string _defaultHostAddress; - - ConcurrentQueue _incomingMessages; -}; - -} // namespace openspace - -#endif // __OPENSPACE_CORE___PARALLELSERVER___H__ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 78c3043f99..383edb9306 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -82,7 +82,6 @@ set(OPENSPACE_SOURCE ${OPENSPACE_BASE_DIR}/src/network/parallelconnection.cpp ${OPENSPACE_BASE_DIR}/src/network/parallelpeer.cpp ${OPENSPACE_BASE_DIR}/src/network/parallelpeer_lua.inl - ${OPENSPACE_BASE_DIR}/src/network/parallelserver.cpp ${OPENSPACE_BASE_DIR}/src/properties/optionproperty.cpp ${OPENSPACE_BASE_DIR}/src/properties/property.cpp ${OPENSPACE_BASE_DIR}/src/properties/propertyowner.cpp @@ -262,7 +261,6 @@ set(OPENSPACE_HEADER ${OPENSPACE_BASE_DIR}/include/openspace/navigation/waypoint.h ${OPENSPACE_BASE_DIR}/include/openspace/network/parallelconnection.h ${OPENSPACE_BASE_DIR}/include/openspace/network/parallelpeer.h - ${OPENSPACE_BASE_DIR}/include/openspace/network/parallelserver.h ${OPENSPACE_BASE_DIR}/include/openspace/network/messagestructures.h ${OPENSPACE_BASE_DIR}/include/openspace/network/messagestructureshelper.h ${OPENSPACE_BASE_DIR}/include/openspace/properties/listproperty.h diff --git a/src/network/parallelconnection.cpp b/src/network/parallelconnection.cpp index 32fff469b0..9d28916d91 100644 --- a/src/network/parallelconnection.cpp +++ b/src/network/parallelconnection.cpp @@ -37,7 +37,8 @@ namespace { namespace openspace { -const unsigned int ParallelConnection::ProtocolVersion = 5; +// Gonna do some UTF-like magic once we reach 255 to introduce a second byte or something +const uint8_t ParallelConnection::ProtocolVersion = 6; ParallelConnection::Message::Message(MessageType t, std::vector c) : type(t) @@ -52,8 +53,9 @@ ParallelConnection::DataMessage::DataMessage(datamessagestructures::Type t, , content(std::move(c)) {} -ParallelConnection::ConnectionLostError::ConnectionLostError() +ParallelConnection::ConnectionLostError::ConnectionLostError(bool shouldLogError_) : ghoul::RuntimeError("Parallel connection lost", "ParallelConnection") + , shouldLogError(shouldLogError_) {} ParallelConnection::ParallelConnection(std::unique_ptr socket) @@ -65,14 +67,14 @@ bool ParallelConnection::isConnectedOrConnecting() const { } void ParallelConnection::sendDataMessage(const DataMessage& dataMessage) { - const uint32_t dataMessageTypeOut = static_cast(dataMessage.type); + const uint8_t dataMessageTypeOut = static_cast(dataMessage.type); const double dataMessageTimestamp = dataMessage.timestamp; std::vector messageContent; messageContent.insert( messageContent.end(), reinterpret_cast(&dataMessageTypeOut), - reinterpret_cast(&dataMessageTypeOut) + sizeof(uint32_t) + reinterpret_cast(&dataMessageTypeOut) + sizeof(uint8_t) ); messageContent.insert( @@ -90,7 +92,7 @@ void ParallelConnection::sendDataMessage(const DataMessage& dataMessage) { } bool ParallelConnection::sendMessage(const Message& message) { - const uint32_t messageTypeOut = static_cast(message.type); + const uint8_t messageTypeOut = static_cast(message.type); const uint32_t messageSizeOut = static_cast(message.content.size()); std::vector header; @@ -100,12 +102,12 @@ bool ParallelConnection::sendMessage(const Message& message) { header.insert(header.end(), reinterpret_cast(&ProtocolVersion), - reinterpret_cast(&ProtocolVersion) + sizeof(uint32_t) + reinterpret_cast(&ProtocolVersion) + sizeof(uint8_t) ); header.insert(header.end(), reinterpret_cast(&messageTypeOut), - reinterpret_cast(&messageTypeOut) + sizeof(uint32_t) + reinterpret_cast(&messageTypeOut) + sizeof(uint8_t) ); header.insert(header.end(), @@ -123,6 +125,7 @@ bool ParallelConnection::sendMessage(const Message& message) { } void ParallelConnection::disconnect() { + _shouldDisconnect = true; if (_socket) { _socket->disconnect(); } @@ -136,7 +139,9 @@ ParallelConnection::Message ParallelConnection::receiveMessage() { // Header consists of... constexpr size_t HeaderSize = 2 * sizeof(char) + // OS - 3 * sizeof(uint32_t); // Protocol version, message type and message size + sizeof(uint8_t) + // Protocol version + sizeof(uint8_t) + // Message type + sizeof(uint32_t); // message size // Create basic buffer for receiving first part of messages std::vector headerBuffer(HeaderSize); @@ -144,20 +149,27 @@ ParallelConnection::Message ParallelConnection::receiveMessage() { // Receive the header data if (!_socket->get(headerBuffer.data(), HeaderSize)) { - LERROR("Failed to read header from socket. Disconencting."); - throw ConnectionLostError(); + // The `get` call is blocking until something happens, so we might end up here if + // the socket properly closed or if the loading legitimately failed + if (_shouldDisconnect) { + throw ConnectionLostError(false); + } + else { + LERROR("Failed to read header from socket. Disconnecting"); + throw ConnectionLostError(); + } } // Make sure that header matches this version of OpenSpace if (!(headerBuffer[0] == 'O' && headerBuffer[1] == 'S')) { - LERROR("Expected to read message header 'OS' from socket."); + LERROR("Expected to read message header 'OS' from socket"); throw ConnectionLostError(); } size_t offset = 2; - const uint32_t protocolVersionIn = - *reinterpret_cast(headerBuffer.data() + offset); - offset += sizeof(uint32_t); + const uint8_t protocolVersionIn = + *reinterpret_cast(headerBuffer.data() + offset); + offset += sizeof(uint8_t); if (protocolVersionIn != ProtocolVersion) { LERROR(fmt::format( @@ -168,9 +180,9 @@ ParallelConnection::Message ParallelConnection::receiveMessage() { throw ConnectionLostError(); } - const uint32_t messageTypeIn = - *reinterpret_cast(headerBuffer.data() + offset); - offset += sizeof(uint32_t); + const uint8_t messageTypeIn = + *reinterpret_cast(headerBuffer.data() + offset); + offset += sizeof(uint8_t); const uint32_t messageSizeIn = *reinterpret_cast(headerBuffer.data() + offset); @@ -181,7 +193,7 @@ ParallelConnection::Message ParallelConnection::receiveMessage() { // Receive the payload messageBuffer.resize(messageSize); if (!_socket->get(messageBuffer.data(), messageSize)) { - LERROR("Failed to read message from socket. Disconencting."); + LERROR("Failed to read message from socket. Disconnecting"); throw ConnectionLostError(); } diff --git a/src/network/parallelpeer.cpp b/src/network/parallelpeer.cpp index 29efd91b82..e75d60d854 100644 --- a/src/network/parallelpeer.cpp +++ b/src/network/parallelpeer.cpp @@ -163,31 +163,59 @@ void ParallelPeer::disconnect() { } void ParallelPeer::sendAuthentication() { - std::string name = _name; - // Length of this nodes name - const uint32_t nameLength = static_cast(name.length()); + std::string password = _password; + if (password.size() > std::numeric_limits::max()) { + password.resize(std::numeric_limits::max()); + } + const uint16_t passwordSize = static_cast(password.size()); - // Total size of the buffer: (passcode + namelength + name) - const size_t size = sizeof(uint64_t) + sizeof(uint32_t) + nameLength; + std::string hostPassword = _hostPassword; + if (hostPassword.size() > std::numeric_limits::max()) { + hostPassword.resize(std::numeric_limits::max()); + } + const uint16_t hostPasswordSize = static_cast(hostPassword.size()); + + std::string name = _name; + if (name.size() > std::numeric_limits::max()) { + name.resize(std::numeric_limits::max()); + } + const uint8_t nameLength = static_cast(name.length()); + + + // Total size of the buffer + const size_t size = + sizeof(uint16_t) + // password length + passwordSize + // password + sizeof(uint16_t) + // host password length + hostPasswordSize + // host password + sizeof(uint8_t) + // name length + nameLength; // name // Create and reserve buffer std::vector buffer; buffer.reserve(size); - const uint64_t passCode = std::hash{}(_password.value()); - - // Write the hashed password to buffer + // Write the password to buffer buffer.insert( buffer.end(), - reinterpret_cast(&passCode), - reinterpret_cast(&passCode) + sizeof(uint64_t) + reinterpret_cast(&passwordSize), + reinterpret_cast(&passwordSize) + sizeof(uint16_t) ); + buffer.insert(buffer.end(), password.begin(), password.end()); + + // Write the host password to buffer + buffer.insert( + buffer.end(), + reinterpret_cast(&hostPasswordSize), + reinterpret_cast(&hostPasswordSize) + sizeof(uint16_t) + ); + buffer.insert(buffer.end(), hostPassword.begin(), hostPassword.end()); // Write the length of the nodes name to buffer buffer.insert( buffer.end(), reinterpret_cast(&nameLength), - reinterpret_cast(&nameLength) + sizeof(uint32_t) + reinterpret_cast(&nameLength) + sizeof(uint8_t) ); // Write this node's name to buffer @@ -265,8 +293,8 @@ void ParallelPeer::dataMessageReceived(const std::vector& message) { size_t offset = 0; // The type of data message received - const uint32_t type = *(reinterpret_cast(message.data() + offset)); - offset += sizeof(uint32_t); + const uint8_t type = *(reinterpret_cast(message.data() + offset)); + offset += sizeof(uint8_t); const double timestamp = *(reinterpret_cast(message.data() + offset)); offset += sizeof(double); @@ -363,19 +391,19 @@ void ParallelPeer::dataMessageReceived(const std::vector& message) { } void ParallelPeer::connectionStatusMessageReceived(const std::vector& message) { - if (message.size() < 2 * sizeof(uint32_t)) { + if (message.size() < 2 * sizeof(uint8_t)) { LERROR("Malformed connection status message"); return; } size_t pointer = 0; - uint32_t statusIn = *(reinterpret_cast(&message[pointer])); + const uint8_t statusIn = *(reinterpret_cast(&message[pointer])); const ParallelConnection::Status status = static_cast( statusIn ); - pointer += sizeof(uint32_t); + pointer += sizeof(uint8_t); - const size_t hostNameSize = *(reinterpret_cast(&message[pointer])); - pointer += sizeof(uint32_t); + const uint8_t hostNameSize = *(reinterpret_cast(&message[pointer])); + pointer += sizeof(uint8_t); if (hostNameSize > message.size() - pointer) { LERROR("Malformed connection status message"); @@ -409,8 +437,7 @@ void ParallelPeer::connectionStatusMessageReceived(const std::vector& mess global::timeManager->clearKeyframes(); } -void ParallelPeer::nConnectionsMessageReceived(const std::vector& message) -{ +void ParallelPeer::nConnectionsMessageReceived(const std::vector& message) { if (message.size() < sizeof(uint32_t)) { LERROR("Malformed host info message"); return; @@ -424,8 +451,10 @@ void ParallelPeer::handleCommunication() { try { ParallelConnection::Message m = _connection.receiveMessage(); queueInMessage(m); - } catch (const ParallelConnection::ConnectionLostError&) { - LERROR("Parallel connection lost"); + } catch (const ParallelConnection::ConnectionLostError& e) { + if (e.shouldLogError) { + LERROR("Parallel connection lost"); + } } } setStatus(ParallelConnection::Status::Disconnected); @@ -445,12 +474,14 @@ void ParallelPeer::setName(std::string name) { void ParallelPeer::requestHostship() { std::vector buffer; - uint64_t passwordHash = std::hash{}(_hostPassword); + std::string hostPw = _hostPassword; + uint16_t hostPwSize = static_cast(hostPw.size()); buffer.insert( buffer.end(), - reinterpret_cast(&passwordHash), - reinterpret_cast(&passwordHash) + sizeof(uint64_t) + reinterpret_cast(&hostPwSize), + reinterpret_cast(&hostPwSize) + sizeof(uint16_t) ); + buffer.insert(buffer.end(), hostPw.begin(), hostPw.end()); _connection.sendMessage(ParallelConnection::Message( ParallelConnection::MessageType::HostshipRequest, @@ -504,7 +535,7 @@ void ParallelPeer::resetTimeOffset() { void ParallelPeer::preSynchronization() { ZoneScoped - std::unique_lock unqlock(_receiveBufferMutex); + std::unique_lock unlock(_receiveBufferMutex); while (!_receiveBuffer.empty()) { ParallelConnection::Message& message = _receiveBuffer.front(); handleMessage(message); diff --git a/src/network/parallelserver.cpp b/src/network/parallelserver.cpp deleted file mode 100644 index c01521f617..0000000000 --- a/src/network/parallelserver.cpp +++ /dev/null @@ -1,426 +0,0 @@ -/***************************************************************************************** - * * - * OpenSpace * - * * - * Copyright (c) 2014-2022 * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this * - * software and associated documentation files (the "Software"), to deal in the Software * - * without restriction, including without limitation the rights to use, copy, modify, * - * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * - * permit persons to whom the Software is furnished to do so, subject to the following * - * conditions: * - * * - * The above copyright notice and this permission notice shall be included in all copies * - * or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * - * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * - * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * - * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * - ****************************************************************************************/ - -#include - -#include -#include -#include -#include - -// @TODO(abock): In the entire class remove std::shared_ptr by const Peer& where -// possible to simplify the interface - -namespace { - constexpr const char* _loggerCat = "ParallelServer"; -} // namespace - -namespace openspace { - -void ParallelServer::start(int port, const std::string& password, - const std::string& changeHostPassword) -{ - _socketServer.listen(port); - _passwordHash = std::hash{}(password); - _changeHostPasswordHash = std::hash{}(changeHostPassword); - - _serverThread = std::thread([this](){ handleNewPeers(); }); - _eventLoopThread = std::thread([this]() { eventLoop(); }); -} - -void ParallelServer::setDefaultHostAddress(std::string defaultHostAddress) { - std::lock_guard lock(_hostInfoMutex); - _defaultHostAddress = std::move(defaultHostAddress); -} - -std::string ParallelServer::defaultHostAddress() const { - std::lock_guard lock(_hostInfoMutex); - return _defaultHostAddress; -} - -void ParallelServer::stop() { - _shouldStop = true; - _socketServer.close(); -} - -void ParallelServer::handleNewPeers() { - while (!_shouldStop) { - std::unique_ptr s = _socketServer.awaitPendingTcpSocket(); - - s->startStreams(); - - const size_t id = _nextConnectionId++; - auto p = std::make_shared(Peer{ - id, - "", - ParallelConnection(std::move(s)), - ParallelConnection::Status::Connecting, - std::thread() - }); - auto it = _peers.emplace(p->id, p); - it.first->second->thread = std::thread([this, id]() { handlePeer(id); }); - } -} - -std::shared_ptr ParallelServer::peer(size_t id) { - std::lock_guard lock(_peerListMutex); - const auto it = _peers.find(id); - if (it == _peers.end()) { - return nullptr; - } - return it->second; -} - -void ParallelServer::handlePeer(size_t id) { - while (!_shouldStop) { - std::shared_ptr p = peer(id); - if (!p) { - return; - } - - if (!p->parallelConnection.isConnectedOrConnecting()) { - return; - } - try { - ParallelConnection::Message m = p->parallelConnection.receiveMessage(); - PeerMessage msg; - msg.peerId = id; - msg.message = m; - _incomingMessages.push(msg); - } - catch (const ParallelConnection::ConnectionLostError&) { - LERROR(fmt::format("Connection lost to {}", p->id)); - PeerMessage msg; - msg.peerId = id; - msg.message = ParallelConnection::Message( - ParallelConnection::MessageType::Disconnection, std::vector() - ); - _incomingMessages.push(msg); - return; - } - } -} - -void ParallelServer::eventLoop() { - while (!_shouldStop) { - PeerMessage pm = _incomingMessages.pop(); - handlePeerMessage(std::move(pm)); - } -} - -void ParallelServer::handlePeerMessage(PeerMessage peerMessage) { - const size_t peerId = peerMessage.peerId; - auto it = _peers.find(peerId); - if (it == _peers.end()) { - return; - } - - std::shared_ptr& peer = it->second; - - const ParallelConnection::MessageType type = peerMessage.message.type; - std::vector& data = peerMessage.message.content; - switch (type) { - case ParallelConnection::MessageType::Authentication: - handleAuthentication(peer, std::move(data)); - break; - case ParallelConnection::MessageType::Data: - handleData(*peer, std::move(data)); - break; - case ParallelConnection::MessageType::HostshipRequest: - handleHostshipRequest(peer, std::move(data)); - break; - case ParallelConnection::MessageType::HostshipResignation: - handleHostshipResignation(*peer); - break; - case ParallelConnection::MessageType::Disconnection: - disconnect(*peer); - break; - default: - LERROR(fmt::format("Unsupported message type: {}", static_cast(type))); - break; - } -} - -void ParallelServer::handleAuthentication(std::shared_ptr peer, - std::vector message) -{ - std::stringstream input(std::string(message.begin(), message.end())); - - // 8 bytes passcode - uint64_t passwordHash = 0; - input.read(reinterpret_cast(&passwordHash), sizeof(uint64_t)); - - if (passwordHash != _passwordHash) { - LERROR(fmt::format("Connection {} provided incorrect passcode.", peer->id)); - disconnect(*peer); - return; - } - - // 4 bytes name size - uint32_t nameSize = 0; - input.read(reinterpret_cast(&nameSize), sizeof(uint32_t)); - - // bytes name - std::string name(nameSize, static_cast(0)); - input.read(&name[0], nameSize); - - if (nameSize == 0) { - name = "Anonymous"; - } - - setName(*peer, name); - - LINFO(fmt::format("Connection established with {} ('{}')", peer->id, name)); - - std::string defaultHostAddress; - { - std::lock_guard _hostMutex(_hostInfoMutex); - defaultHostAddress = _defaultHostAddress; - } - if (_hostPeerId == 0 && - peer->parallelConnection.socket()->address() == defaultHostAddress) - { - // Directly promote the conenction to host (initialize) if there is no host, and - // ip matches default host ip. - LINFO(fmt::format("Connection {} directly promoted to host", peer->id)); - assignHost(peer); - for (std::pair>& it : _peers) { - // sendConnectionStatus(it->second) ? - sendConnectionStatus(*peer); - } - } - else { - setToClient(*peer); - } - - setNConnections(nConnections() + 1); -} - -void ParallelServer::handleData(const Peer& peer, std::vector data) { - if (peer.id != _hostPeerId) { - LINFO(fmt::format( - "Ignoring connection {} trying to send data without being host", peer.id - )); - } - sendMessageToClients(ParallelConnection::MessageType::Data, data); -} - -void ParallelServer::handleHostshipRequest(std::shared_ptr peer, - std::vector message) -{ - std::stringstream input(std::string(message.begin(), message.end())); - - LINFO(fmt::format("Connection {} requested hostship", peer->id)); - - uint64_t passwordHash = 0; - input.read(reinterpret_cast(&passwordHash), sizeof(uint64_t)); - - if (passwordHash != _changeHostPasswordHash) { - LERROR(fmt::format("Connection {} provided incorrect host password", peer->id)); - return; - } - - size_t oldHostPeerId = 0; - { - std::lock_guard lock(_hostInfoMutex); - oldHostPeerId = _hostPeerId; - } - - if (oldHostPeerId == peer->id) { - LINFO(fmt::format("Connection {} is already the host", peer->id)); - return; - } - - assignHost(peer); - LINFO(fmt::format("Switched host from {} to {}", oldHostPeerId, peer->id)); -} - -void ParallelServer::handleHostshipResignation(Peer& peer) { - LINFO(fmt::format("Connection {} wants to resign its hostship", peer.id)); - - setToClient(peer); - - LINFO(fmt::format("Connection {} resigned as host", peer.id)); -} - -bool ParallelServer::isConnected(const Peer& peer) const { - return peer.status != ParallelConnection::Status::Connecting && - peer.status != ParallelConnection::Status::Disconnected; -} - -void ParallelServer::sendMessage(Peer& peer, ParallelConnection::MessageType messageType, - const std::vector& message) -{ - peer.parallelConnection.sendMessage({ messageType, message }); -} - -void ParallelServer::sendMessageToAll(ParallelConnection::MessageType messageType, - const std::vector& message) -{ - for (std::pair>& it : _peers) { - if (isConnected(*it.second)) { - it.second->parallelConnection.sendMessage({ messageType, message }); - } - } -} - -void ParallelServer::sendMessageToClients(ParallelConnection::MessageType messageType, - const std::vector& message) -{ - for (std::pair>& it : _peers) { - if (it.second->status == ParallelConnection::Status::ClientWithHost) { - it.second->parallelConnection.sendMessage({ messageType, message }); - } - } -} - -void ParallelServer::disconnect(Peer& peer) { - if (isConnected(peer)) { - setNConnections(nConnections() - 1); - } - - size_t hostPeerId = 0; - { - std::lock_guard lock(_hostInfoMutex); - hostPeerId = _hostPeerId; - } - - // Make sure any disconnecting host is first degraded to client, in order to notify - // other clients about host disconnection. - if (peer.id == hostPeerId) { - setToClient(peer); - } - - peer.parallelConnection.disconnect(); - peer.thread.join(); - _peers.erase(peer.id); -} - -void ParallelServer::setName(Peer& peer, std::string name) { - peer.name = std::move(name); - size_t hostPeerId = 0; - { - std::lock_guard lock(_hostInfoMutex); - hostPeerId = _hostPeerId; - } - - // Make sure everyone gets the new host name. - if (peer.id == hostPeerId) { - { - std::lock_guard lock(_hostInfoMutex); - _hostName = peer.name; - } - - for (std::pair>& it : _peers) { - // sendConnectionStatus(it->second) ? - sendConnectionStatus(peer); - } - } -} - -void ParallelServer::assignHost(std::shared_ptr newHost) { - { - std::lock_guard lock(_hostInfoMutex); - std::shared_ptr oldHost = peer(_hostPeerId); - - if (oldHost) { - oldHost->status = ParallelConnection::Status::ClientWithHost; - } - _hostPeerId = newHost->id; - _hostName = newHost->name; - } - newHost->status = ParallelConnection::Status::Host; - - for (std::pair>& it : _peers) { - if (it.second != newHost) { - it.second->status = ParallelConnection::Status::ClientWithHost; - } - sendConnectionStatus(*it.second); - } -} - -void ParallelServer::setToClient(Peer& peer) { - if (peer.status == ParallelConnection::Status::Host) { - { - std::lock_guard lock(_hostInfoMutex); - _hostPeerId = 0; - _hostName.clear(); - } - - // If host becomes client, make all clients hostless. - for (std::pair>& it : _peers) { - it.second->status = ParallelConnection::Status::ClientWithoutHost; - sendConnectionStatus(*it.second); - } - } - else { - peer.status = (_hostPeerId > 0) ? - ParallelConnection::Status::ClientWithHost : - ParallelConnection::Status::ClientWithoutHost; - sendConnectionStatus(peer); - } -} - -void ParallelServer::setNConnections(size_t nConnections) { - _nConnections = nConnections; - std::vector data; - const uint32_t n = static_cast(_nConnections); - data.insert( - data.end(), - reinterpret_cast(&n), - reinterpret_cast(&n) + sizeof(uint32_t) - ); - sendMessageToAll(ParallelConnection::MessageType::NConnections, data); -} - -void ParallelServer::sendConnectionStatus(Peer& peer) { - std::vector data; - const uint32_t outStatus = static_cast(peer.status); - data.insert( - data.end(), - reinterpret_cast(&outStatus), - reinterpret_cast(&outStatus) + sizeof(uint32_t) - ); - - const uint32_t outHostNameSize = static_cast(_hostName.size()); - data.insert( - data.end(), - reinterpret_cast(&outHostNameSize), - reinterpret_cast(&outHostNameSize) + sizeof(uint32_t) - ); - - data.insert( - data.end(), - _hostName.data(), - _hostName.data() + outHostNameSize - ); - - sendMessage(peer, ParallelConnection::MessageType::ConnectionStatus, data); -} - -size_t ParallelServer::nConnections() const { - return _nConnections; -} - -} // namespace openspace