mirror of
https://github.com/OpenSpace/OpenSpace.git
synced 2026-01-04 10:40:09 -06:00
Feature/wormhole (#1985)
* Backwards incompatible change to the astrocasting protocol * Send password in the clear [never thought i write something like this] * Update to new Protocol version, remove built-in ParallelServer * Remove Wormhole server in favor of Typescript version
This commit is contained in:
@@ -1,64 +0,0 @@
|
||||
##########################################################################################
|
||||
# #
|
||||
# OpenSpace #
|
||||
# #
|
||||
# Copyright (c) 2014-2022 #
|
||||
# #
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy of this #
|
||||
# software and associated documentation files (the "Software"), to deal in the Software #
|
||||
# without restriction, including without limitation the rights to use, copy, modify, #
|
||||
# merge, publish, distribute, sublicense, and/or sell copies of the Software, and to #
|
||||
# permit persons to whom the Software is furnished to do so, subject to the following #
|
||||
# conditions: #
|
||||
# #
|
||||
# The above copyright notice and this permission notice shall be included in all copies #
|
||||
# or substantial portions of the Software. #
|
||||
# #
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, #
|
||||
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A #
|
||||
# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT #
|
||||
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF #
|
||||
# CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE #
|
||||
# OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #
|
||||
##########################################################################################
|
||||
|
||||
include(${OPENSPACE_CMAKE_EXT_DIR}/application_definition.cmake)
|
||||
|
||||
set_source_files_properties(
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/openspace.icns
|
||||
PROPERTIES MACOSX_PACKAGE_LOCATION "Resources"
|
||||
)
|
||||
|
||||
set(MACOSX_BUNDLE_ICON_FILE openspace.icns)
|
||||
|
||||
create_new_application(
|
||||
Wormhole MACOSX_BUNDLE
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/openspace.rc
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/openspace.icns
|
||||
)
|
||||
|
||||
target_link_libraries(Wormhole PRIVATE openspace-core openspace-module-collection)
|
||||
|
||||
# Web Browser and Web gui
|
||||
# Why not put these in the module's path? Because they do not have access to the
|
||||
# target as of July 2017, which is needed.
|
||||
if (OPENSPACE_MODULE_WEBBROWSER AND CEF_ROOT)
|
||||
# wanted by CEF
|
||||
set(CMAKE_BUILD_TYPE Debug CACHE STRING "CMAKE_BUILD_TYPE")
|
||||
set(PROJECT_ARCH "x86_64")
|
||||
|
||||
if (WIN32)
|
||||
set(RESOURCE_FILE ${OPENSPACE_APPS_DIR}/OpenSpace/openspace.rc)
|
||||
endif ()
|
||||
|
||||
# Add the CEF binary distribution's cmake/ directory to the module path and
|
||||
# find CEF to initialize it properly.
|
||||
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${WEBBROWSER_MODULE_PATH}/cmake")
|
||||
include(webbrowser_helpers)
|
||||
|
||||
set_cef_targets("${CEF_ROOT}" Wormhole)
|
||||
run_cef_platform_config("${CEF_ROOT}" "${CEF_TARGET}" "${WEBBROWSER_MODULE_PATH}")
|
||||
elseif (OPENSPACE_MODULE_WEBBROWSER)
|
||||
message(WARNING "Web configured to be included, but no CEF_ROOT was found, please try configuring CMake again.")
|
||||
endif ()
|
||||
@@ -1 +0,0 @@
|
||||
set(DEFAULT_APPLICATION ON)
|
||||
@@ -1,133 +0,0 @@
|
||||
/*****************************************************************************************
|
||||
* *
|
||||
* OpenSpace *
|
||||
* *
|
||||
* Copyright (c) 2014-2022 *
|
||||
* *
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy of this *
|
||||
* software and associated documentation files (the "Software"), to deal in the Software *
|
||||
* without restriction, including without limitation the rights to use, copy, modify, *
|
||||
* merge, publish, distribute, sublicense, and/or sell copies of the Software, and to *
|
||||
* permit persons to whom the Software is furnished to do so, subject to the following *
|
||||
* conditions: *
|
||||
* *
|
||||
* The above copyright notice and this permission notice shall be included in all copies *
|
||||
* or substantial portions of the Software. *
|
||||
* *
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, *
|
||||
* INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A *
|
||||
* PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT *
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF *
|
||||
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE *
|
||||
* OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *
|
||||
****************************************************************************************/
|
||||
|
||||
#include <openspace/network/parallelserver.h>
|
||||
#include <ghoul/fmt.h>
|
||||
#include <ghoul/cmdparser/commandlineparser.h>
|
||||
#include <ghoul/cmdparser/singlecommand.h>
|
||||
#include <ghoul/logging/logmanager.h>
|
||||
#include <iomanip>
|
||||
|
||||
namespace {
|
||||
constexpr const char*_loggerCat = "Wormhole";
|
||||
} // namespace
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
using namespace openspace;
|
||||
using namespace ghoul::cmdparser;
|
||||
|
||||
std::vector<std::string> arguments(argv, argv + argc);
|
||||
|
||||
CommandlineParser commandlineParser(
|
||||
"Wormhole",
|
||||
CommandlineParser::AllowUnknownCommands::Yes
|
||||
);
|
||||
|
||||
struct {
|
||||
std::string port;
|
||||
std::string password;
|
||||
std::string changeHostPassword;
|
||||
} settings;
|
||||
|
||||
commandlineParser.addCommand(
|
||||
std::make_unique<ghoul::cmdparser::SingleCommand<std::string>>(
|
||||
settings.port,
|
||||
"--port",
|
||||
"-p",
|
||||
"Sets the port to listen on"
|
||||
)
|
||||
);
|
||||
|
||||
commandlineParser.addCommand(
|
||||
std::make_unique<ghoul::cmdparser::SingleCommand<std::string>>(
|
||||
settings.password,
|
||||
"--password",
|
||||
"-l",
|
||||
"Sets the password to use"
|
||||
)
|
||||
);
|
||||
|
||||
commandlineParser.addCommand(
|
||||
std::make_unique<ghoul::cmdparser::SingleCommand<std::string>>(
|
||||
settings.changeHostPassword,
|
||||
"--hostpassword",
|
||||
"-h",
|
||||
"Sets the host password to use"
|
||||
)
|
||||
);
|
||||
|
||||
ghoul::logging::LogManager::initialize(
|
||||
ghoul::logging::LogLevel::Debug,
|
||||
ghoul::logging::LogManager::ImmediateFlush::Yes
|
||||
);
|
||||
|
||||
commandlineParser.setCommandLine(arguments);
|
||||
commandlineParser.execute();
|
||||
|
||||
if (settings.password.empty()) {
|
||||
std::stringstream defaultPassword;
|
||||
defaultPassword << std::hex << std::setfill('0') << std::setw(6) <<
|
||||
(std::hash<size_t>{}(
|
||||
std::chrono::system_clock::now().time_since_epoch().count()
|
||||
) % 0xffffff);
|
||||
|
||||
settings.password = defaultPassword.str();
|
||||
}
|
||||
if (settings.changeHostPassword.empty()) {
|
||||
std::stringstream defaultChangeHostPassword;
|
||||
defaultChangeHostPassword << std::hex << std::setfill('0') << std::setw(6) <<
|
||||
(std::hash<size_t>{}(
|
||||
std::chrono::system_clock::now().time_since_epoch().count() + 1
|
||||
) % 0xffffff);
|
||||
|
||||
settings.changeHostPassword = defaultChangeHostPassword.str();
|
||||
}
|
||||
LINFO(fmt::format("Connection password: {}", settings.password));
|
||||
LINFO(fmt::format("Host password: {}", settings.changeHostPassword));
|
||||
|
||||
int port = 25001;
|
||||
|
||||
if (!settings.port.empty()) {
|
||||
try {
|
||||
port = std::stoi(settings.port);
|
||||
}
|
||||
catch (const std::invalid_argument&) {
|
||||
LERROR(fmt::format("Invalid port: {}", settings.port));
|
||||
}
|
||||
}
|
||||
|
||||
ParallelServer server;
|
||||
server.start(port, settings.password, settings.changeHostPassword);
|
||||
server.setDefaultHostAddress("127.0.0.1");
|
||||
LINFO(fmt::format("Server listening to port {}", port));
|
||||
|
||||
while (std::cin.get() != 'q') {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
||||
}
|
||||
|
||||
server.stop();
|
||||
LINFO("Server stopped");
|
||||
|
||||
return 0;
|
||||
}
|
||||
Binary file not shown.
Binary file not shown.
|
Before Width: | Height: | Size: 88 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 28 KiB |
@@ -1 +0,0 @@
|
||||
IDI_ICON1 ICON DISCARDABLE "openspace.ico"
|
||||
@@ -89,13 +89,13 @@ struct CameraKeyframe {
|
||||
sizeof(_followNodeRotation)
|
||||
);
|
||||
|
||||
int nodeNameLength = static_cast<int>(_focusNode.size());
|
||||
uint32_t nodeNameLength = static_cast<uint32_t>(_focusNode.size());
|
||||
|
||||
// Add focus node
|
||||
buffer.insert(
|
||||
buffer.end(),
|
||||
reinterpret_cast<const char*>(&nodeNameLength),
|
||||
reinterpret_cast<const char*>(&nodeNameLength) + sizeof(nodeNameLength)
|
||||
reinterpret_cast<const char*>(&nodeNameLength) + sizeof(uint32_t)
|
||||
);
|
||||
buffer.insert(
|
||||
buffer.end(),
|
||||
|
||||
@@ -42,14 +42,13 @@ public:
|
||||
Host
|
||||
};
|
||||
|
||||
enum class MessageType : uint32_t {
|
||||
enum class MessageType : uint8_t {
|
||||
Authentication = 0,
|
||||
Data,
|
||||
ConnectionStatus,
|
||||
HostshipRequest,
|
||||
HostshipResignation,
|
||||
NConnections,
|
||||
Disconnection
|
||||
NConnections
|
||||
};
|
||||
|
||||
struct Message {
|
||||
@@ -71,7 +70,9 @@ public:
|
||||
|
||||
class ConnectionLostError : public ghoul::RuntimeError {
|
||||
public:
|
||||
explicit ConnectionLostError();
|
||||
explicit ConnectionLostError(bool shouldLogError = true);
|
||||
|
||||
bool shouldLogError;
|
||||
};
|
||||
|
||||
ParallelConnection(std::unique_ptr<ghoul::io::TcpSocket> socket);
|
||||
@@ -84,9 +85,11 @@ public:
|
||||
|
||||
ParallelConnection::Message receiveMessage();
|
||||
|
||||
static const unsigned int ProtocolVersion;
|
||||
static const uint8_t ProtocolVersion;
|
||||
|
||||
private:
|
||||
std::unique_ptr<ghoul::io::TcpSocket> _socket;
|
||||
bool _shouldDisconnect = false;
|
||||
};
|
||||
|
||||
} // namespace openspace
|
||||
|
||||
@@ -1,117 +0,0 @@
|
||||
/*****************************************************************************************
|
||||
* *
|
||||
* OpenSpace *
|
||||
* *
|
||||
* Copyright (c) 2014-2022 *
|
||||
* *
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy of this *
|
||||
* software and associated documentation files (the "Software"), to deal in the Software *
|
||||
* without restriction, including without limitation the rights to use, copy, modify, *
|
||||
* merge, publish, distribute, sublicense, and/or sell copies of the Software, and to *
|
||||
* permit persons to whom the Software is furnished to do so, subject to the following *
|
||||
* conditions: *
|
||||
* *
|
||||
* The above copyright notice and this permission notice shall be included in all copies *
|
||||
* or substantial portions of the Software. *
|
||||
* *
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, *
|
||||
* INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A *
|
||||
* PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT *
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF *
|
||||
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE *
|
||||
* OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *
|
||||
****************************************************************************************/
|
||||
|
||||
#ifndef __OPENSPACE_CORE___PARALLELSERVER___H__
|
||||
#define __OPENSPACE_CORE___PARALLELSERVER___H__
|
||||
|
||||
#include <openspace/network/parallelconnection.h>
|
||||
|
||||
#include <openspace/util/concurrentqueue.h>
|
||||
#include <ghoul/io/socket/tcpsocketserver.h>
|
||||
#include <atomic>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace openspace {
|
||||
|
||||
class ParallelServer {
|
||||
public:
|
||||
void start(int port, const std::string& password,
|
||||
const std::string& changeHostPassword);
|
||||
|
||||
void setDefaultHostAddress(std::string defaultHostAddress);
|
||||
|
||||
std::string defaultHostAddress() const;
|
||||
|
||||
void stop();
|
||||
|
||||
size_t nConnections() const;
|
||||
|
||||
private:
|
||||
struct Peer {
|
||||
size_t id;
|
||||
std::string name;
|
||||
ParallelConnection parallelConnection;
|
||||
ParallelConnection::Status status;
|
||||
std::thread thread;
|
||||
};
|
||||
|
||||
struct PeerMessage {
|
||||
size_t peerId;
|
||||
ParallelConnection::Message message;
|
||||
};
|
||||
|
||||
bool isConnected(const Peer& peer) const;
|
||||
|
||||
void sendMessage(Peer& peer, ParallelConnection::MessageType messageType,
|
||||
const std::vector<char>& message);
|
||||
|
||||
void sendMessageToAll(ParallelConnection::MessageType messageType,
|
||||
const std::vector<char>& message);
|
||||
|
||||
void sendMessageToClients(ParallelConnection::MessageType messageType,
|
||||
const std::vector<char>& message);
|
||||
|
||||
void disconnect(Peer& peer);
|
||||
void setName(Peer& peer, std::string name);
|
||||
void assignHost(std::shared_ptr<Peer> newHost);
|
||||
void setToClient(Peer& peer);
|
||||
void setNConnections(size_t nConnections);
|
||||
void sendConnectionStatus(Peer& peer);
|
||||
|
||||
void handleAuthentication(std::shared_ptr<Peer> peer, std::vector<char> message);
|
||||
void handleData(const Peer& peer, std::vector<char> data);
|
||||
void handleHostshipRequest(std::shared_ptr<Peer> peer, std::vector<char> message);
|
||||
void handleHostshipResignation(Peer& peer);
|
||||
|
||||
void handleNewPeers();
|
||||
void eventLoop();
|
||||
std::shared_ptr<Peer> peer(size_t id);
|
||||
void handlePeer(size_t id);
|
||||
void handlePeerMessage(PeerMessage peerMessage);
|
||||
|
||||
std::unordered_map<size_t, std::shared_ptr<Peer>> _peers;
|
||||
mutable std::mutex _peerListMutex;
|
||||
|
||||
std::thread _serverThread;
|
||||
std::thread _eventLoopThread;
|
||||
ghoul::io::TcpSocketServer _socketServer;
|
||||
size_t _passwordHash;
|
||||
size_t _changeHostPasswordHash;
|
||||
size_t _nextConnectionId = 1;
|
||||
std::atomic_bool _shouldStop = false;
|
||||
|
||||
std::atomic_size_t _nConnections = 0;
|
||||
std::atomic_size_t _hostPeerId = 0;
|
||||
|
||||
mutable std::mutex _hostInfoMutex;
|
||||
std::string _hostName;
|
||||
std::string _defaultHostAddress;
|
||||
|
||||
ConcurrentQueue<PeerMessage> _incomingMessages;
|
||||
};
|
||||
|
||||
} // namespace openspace
|
||||
|
||||
#endif // __OPENSPACE_CORE___PARALLELSERVER___H__
|
||||
@@ -82,7 +82,6 @@ set(OPENSPACE_SOURCE
|
||||
${OPENSPACE_BASE_DIR}/src/network/parallelconnection.cpp
|
||||
${OPENSPACE_BASE_DIR}/src/network/parallelpeer.cpp
|
||||
${OPENSPACE_BASE_DIR}/src/network/parallelpeer_lua.inl
|
||||
${OPENSPACE_BASE_DIR}/src/network/parallelserver.cpp
|
||||
${OPENSPACE_BASE_DIR}/src/properties/optionproperty.cpp
|
||||
${OPENSPACE_BASE_DIR}/src/properties/property.cpp
|
||||
${OPENSPACE_BASE_DIR}/src/properties/propertyowner.cpp
|
||||
@@ -262,7 +261,6 @@ set(OPENSPACE_HEADER
|
||||
${OPENSPACE_BASE_DIR}/include/openspace/navigation/waypoint.h
|
||||
${OPENSPACE_BASE_DIR}/include/openspace/network/parallelconnection.h
|
||||
${OPENSPACE_BASE_DIR}/include/openspace/network/parallelpeer.h
|
||||
${OPENSPACE_BASE_DIR}/include/openspace/network/parallelserver.h
|
||||
${OPENSPACE_BASE_DIR}/include/openspace/network/messagestructures.h
|
||||
${OPENSPACE_BASE_DIR}/include/openspace/network/messagestructureshelper.h
|
||||
${OPENSPACE_BASE_DIR}/include/openspace/properties/listproperty.h
|
||||
|
||||
@@ -37,7 +37,8 @@ namespace {
|
||||
|
||||
namespace openspace {
|
||||
|
||||
const unsigned int ParallelConnection::ProtocolVersion = 5;
|
||||
// Gonna do some UTF-like magic once we reach 255 to introduce a second byte or something
|
||||
const uint8_t ParallelConnection::ProtocolVersion = 6;
|
||||
|
||||
ParallelConnection::Message::Message(MessageType t, std::vector<char> c)
|
||||
: type(t)
|
||||
@@ -52,8 +53,9 @@ ParallelConnection::DataMessage::DataMessage(datamessagestructures::Type t,
|
||||
, content(std::move(c))
|
||||
{}
|
||||
|
||||
ParallelConnection::ConnectionLostError::ConnectionLostError()
|
||||
ParallelConnection::ConnectionLostError::ConnectionLostError(bool shouldLogError_)
|
||||
: ghoul::RuntimeError("Parallel connection lost", "ParallelConnection")
|
||||
, shouldLogError(shouldLogError_)
|
||||
{}
|
||||
|
||||
ParallelConnection::ParallelConnection(std::unique_ptr<ghoul::io::TcpSocket> socket)
|
||||
@@ -65,14 +67,14 @@ bool ParallelConnection::isConnectedOrConnecting() const {
|
||||
}
|
||||
|
||||
void ParallelConnection::sendDataMessage(const DataMessage& dataMessage) {
|
||||
const uint32_t dataMessageTypeOut = static_cast<uint32_t>(dataMessage.type);
|
||||
const uint8_t dataMessageTypeOut = static_cast<uint8_t>(dataMessage.type);
|
||||
const double dataMessageTimestamp = dataMessage.timestamp;
|
||||
|
||||
std::vector<char> messageContent;
|
||||
messageContent.insert(
|
||||
messageContent.end(),
|
||||
reinterpret_cast<const char*>(&dataMessageTypeOut),
|
||||
reinterpret_cast<const char*>(&dataMessageTypeOut) + sizeof(uint32_t)
|
||||
reinterpret_cast<const char*>(&dataMessageTypeOut) + sizeof(uint8_t)
|
||||
);
|
||||
|
||||
messageContent.insert(
|
||||
@@ -90,7 +92,7 @@ void ParallelConnection::sendDataMessage(const DataMessage& dataMessage) {
|
||||
}
|
||||
|
||||
bool ParallelConnection::sendMessage(const Message& message) {
|
||||
const uint32_t messageTypeOut = static_cast<uint32_t>(message.type);
|
||||
const uint8_t messageTypeOut = static_cast<uint8_t>(message.type);
|
||||
const uint32_t messageSizeOut = static_cast<uint32_t>(message.content.size());
|
||||
std::vector<char> header;
|
||||
|
||||
@@ -100,12 +102,12 @@ bool ParallelConnection::sendMessage(const Message& message) {
|
||||
|
||||
header.insert(header.end(),
|
||||
reinterpret_cast<const char*>(&ProtocolVersion),
|
||||
reinterpret_cast<const char*>(&ProtocolVersion) + sizeof(uint32_t)
|
||||
reinterpret_cast<const char*>(&ProtocolVersion) + sizeof(uint8_t)
|
||||
);
|
||||
|
||||
header.insert(header.end(),
|
||||
reinterpret_cast<const char*>(&messageTypeOut),
|
||||
reinterpret_cast<const char*>(&messageTypeOut) + sizeof(uint32_t)
|
||||
reinterpret_cast<const char*>(&messageTypeOut) + sizeof(uint8_t)
|
||||
);
|
||||
|
||||
header.insert(header.end(),
|
||||
@@ -123,6 +125,7 @@ bool ParallelConnection::sendMessage(const Message& message) {
|
||||
}
|
||||
|
||||
void ParallelConnection::disconnect() {
|
||||
_shouldDisconnect = true;
|
||||
if (_socket) {
|
||||
_socket->disconnect();
|
||||
}
|
||||
@@ -136,7 +139,9 @@ ParallelConnection::Message ParallelConnection::receiveMessage() {
|
||||
// Header consists of...
|
||||
constexpr size_t HeaderSize =
|
||||
2 * sizeof(char) + // OS
|
||||
3 * sizeof(uint32_t); // Protocol version, message type and message size
|
||||
sizeof(uint8_t) + // Protocol version
|
||||
sizeof(uint8_t) + // Message type
|
||||
sizeof(uint32_t); // message size
|
||||
|
||||
// Create basic buffer for receiving first part of messages
|
||||
std::vector<char> headerBuffer(HeaderSize);
|
||||
@@ -144,20 +149,27 @@ ParallelConnection::Message ParallelConnection::receiveMessage() {
|
||||
|
||||
// Receive the header data
|
||||
if (!_socket->get(headerBuffer.data(), HeaderSize)) {
|
||||
LERROR("Failed to read header from socket. Disconencting.");
|
||||
throw ConnectionLostError();
|
||||
// The `get` call is blocking until something happens, so we might end up here if
|
||||
// the socket properly closed or if the loading legitimately failed
|
||||
if (_shouldDisconnect) {
|
||||
throw ConnectionLostError(false);
|
||||
}
|
||||
else {
|
||||
LERROR("Failed to read header from socket. Disconnecting");
|
||||
throw ConnectionLostError();
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure that header matches this version of OpenSpace
|
||||
if (!(headerBuffer[0] == 'O' && headerBuffer[1] == 'S')) {
|
||||
LERROR("Expected to read message header 'OS' from socket.");
|
||||
LERROR("Expected to read message header 'OS' from socket");
|
||||
throw ConnectionLostError();
|
||||
}
|
||||
|
||||
size_t offset = 2;
|
||||
const uint32_t protocolVersionIn =
|
||||
*reinterpret_cast<uint32_t*>(headerBuffer.data() + offset);
|
||||
offset += sizeof(uint32_t);
|
||||
const uint8_t protocolVersionIn =
|
||||
*reinterpret_cast<uint8_t*>(headerBuffer.data() + offset);
|
||||
offset += sizeof(uint8_t);
|
||||
|
||||
if (protocolVersionIn != ProtocolVersion) {
|
||||
LERROR(fmt::format(
|
||||
@@ -168,9 +180,9 @@ ParallelConnection::Message ParallelConnection::receiveMessage() {
|
||||
throw ConnectionLostError();
|
||||
}
|
||||
|
||||
const uint32_t messageTypeIn =
|
||||
*reinterpret_cast<uint32_t*>(headerBuffer.data() + offset);
|
||||
offset += sizeof(uint32_t);
|
||||
const uint8_t messageTypeIn =
|
||||
*reinterpret_cast<uint8_t*>(headerBuffer.data() + offset);
|
||||
offset += sizeof(uint8_t);
|
||||
|
||||
const uint32_t messageSizeIn =
|
||||
*reinterpret_cast<uint32_t*>(headerBuffer.data() + offset);
|
||||
@@ -181,7 +193,7 @@ ParallelConnection::Message ParallelConnection::receiveMessage() {
|
||||
// Receive the payload
|
||||
messageBuffer.resize(messageSize);
|
||||
if (!_socket->get(messageBuffer.data(), messageSize)) {
|
||||
LERROR("Failed to read message from socket. Disconencting.");
|
||||
LERROR("Failed to read message from socket. Disconnecting");
|
||||
throw ConnectionLostError();
|
||||
}
|
||||
|
||||
|
||||
@@ -163,31 +163,59 @@ void ParallelPeer::disconnect() {
|
||||
}
|
||||
|
||||
void ParallelPeer::sendAuthentication() {
|
||||
std::string name = _name;
|
||||
// Length of this nodes name
|
||||
const uint32_t nameLength = static_cast<uint32_t>(name.length());
|
||||
std::string password = _password;
|
||||
if (password.size() > std::numeric_limits<uint16_t>::max()) {
|
||||
password.resize(std::numeric_limits<uint16_t>::max());
|
||||
}
|
||||
const uint16_t passwordSize = static_cast<uint16_t>(password.size());
|
||||
|
||||
// Total size of the buffer: (passcode + namelength + name)
|
||||
const size_t size = sizeof(uint64_t) + sizeof(uint32_t) + nameLength;
|
||||
std::string hostPassword = _hostPassword;
|
||||
if (hostPassword.size() > std::numeric_limits<uint16_t>::max()) {
|
||||
hostPassword.resize(std::numeric_limits<uint16_t>::max());
|
||||
}
|
||||
const uint16_t hostPasswordSize = static_cast<uint16_t>(hostPassword.size());
|
||||
|
||||
std::string name = _name;
|
||||
if (name.size() > std::numeric_limits<uint8_t>::max()) {
|
||||
name.resize(std::numeric_limits<uint8_t>::max());
|
||||
}
|
||||
const uint8_t nameLength = static_cast<uint8_t>(name.length());
|
||||
|
||||
|
||||
// Total size of the buffer
|
||||
const size_t size =
|
||||
sizeof(uint16_t) + // password length
|
||||
passwordSize + // password
|
||||
sizeof(uint16_t) + // host password length
|
||||
hostPasswordSize + // host password
|
||||
sizeof(uint8_t) + // name length
|
||||
nameLength; // name
|
||||
|
||||
// Create and reserve buffer
|
||||
std::vector<char> buffer;
|
||||
buffer.reserve(size);
|
||||
|
||||
const uint64_t passCode = std::hash<std::string>{}(_password.value());
|
||||
|
||||
// Write the hashed password to buffer
|
||||
// Write the password to buffer
|
||||
buffer.insert(
|
||||
buffer.end(),
|
||||
reinterpret_cast<const char*>(&passCode),
|
||||
reinterpret_cast<const char*>(&passCode) + sizeof(uint64_t)
|
||||
reinterpret_cast<const char*>(&passwordSize),
|
||||
reinterpret_cast<const char*>(&passwordSize) + sizeof(uint16_t)
|
||||
);
|
||||
buffer.insert(buffer.end(), password.begin(), password.end());
|
||||
|
||||
// Write the host password to buffer
|
||||
buffer.insert(
|
||||
buffer.end(),
|
||||
reinterpret_cast<const char*>(&hostPasswordSize),
|
||||
reinterpret_cast<const char*>(&hostPasswordSize) + sizeof(uint16_t)
|
||||
);
|
||||
buffer.insert(buffer.end(), hostPassword.begin(), hostPassword.end());
|
||||
|
||||
// Write the length of the nodes name to buffer
|
||||
buffer.insert(
|
||||
buffer.end(),
|
||||
reinterpret_cast<const char*>(&nameLength),
|
||||
reinterpret_cast<const char*>(&nameLength) + sizeof(uint32_t)
|
||||
reinterpret_cast<const char*>(&nameLength) + sizeof(uint8_t)
|
||||
);
|
||||
|
||||
// Write this node's name to buffer
|
||||
@@ -265,8 +293,8 @@ void ParallelPeer::dataMessageReceived(const std::vector<char>& message) {
|
||||
size_t offset = 0;
|
||||
|
||||
// The type of data message received
|
||||
const uint32_t type = *(reinterpret_cast<const uint32_t*>(message.data() + offset));
|
||||
offset += sizeof(uint32_t);
|
||||
const uint8_t type = *(reinterpret_cast<const uint8_t*>(message.data() + offset));
|
||||
offset += sizeof(uint8_t);
|
||||
|
||||
const double timestamp = *(reinterpret_cast<const double*>(message.data() + offset));
|
||||
offset += sizeof(double);
|
||||
@@ -363,19 +391,19 @@ void ParallelPeer::dataMessageReceived(const std::vector<char>& message) {
|
||||
}
|
||||
|
||||
void ParallelPeer::connectionStatusMessageReceived(const std::vector<char>& message) {
|
||||
if (message.size() < 2 * sizeof(uint32_t)) {
|
||||
if (message.size() < 2 * sizeof(uint8_t)) {
|
||||
LERROR("Malformed connection status message");
|
||||
return;
|
||||
}
|
||||
size_t pointer = 0;
|
||||
uint32_t statusIn = *(reinterpret_cast<const uint32_t*>(&message[pointer]));
|
||||
const uint8_t statusIn = *(reinterpret_cast<const uint8_t*>(&message[pointer]));
|
||||
const ParallelConnection::Status status = static_cast<ParallelConnection::Status>(
|
||||
statusIn
|
||||
);
|
||||
pointer += sizeof(uint32_t);
|
||||
pointer += sizeof(uint8_t);
|
||||
|
||||
const size_t hostNameSize = *(reinterpret_cast<const uint32_t*>(&message[pointer]));
|
||||
pointer += sizeof(uint32_t);
|
||||
const uint8_t hostNameSize = *(reinterpret_cast<const uint8_t*>(&message[pointer]));
|
||||
pointer += sizeof(uint8_t);
|
||||
|
||||
if (hostNameSize > message.size() - pointer) {
|
||||
LERROR("Malformed connection status message");
|
||||
@@ -409,8 +437,7 @@ void ParallelPeer::connectionStatusMessageReceived(const std::vector<char>& mess
|
||||
global::timeManager->clearKeyframes();
|
||||
}
|
||||
|
||||
void ParallelPeer::nConnectionsMessageReceived(const std::vector<char>& message)
|
||||
{
|
||||
void ParallelPeer::nConnectionsMessageReceived(const std::vector<char>& message) {
|
||||
if (message.size() < sizeof(uint32_t)) {
|
||||
LERROR("Malformed host info message");
|
||||
return;
|
||||
@@ -424,8 +451,10 @@ void ParallelPeer::handleCommunication() {
|
||||
try {
|
||||
ParallelConnection::Message m = _connection.receiveMessage();
|
||||
queueInMessage(m);
|
||||
} catch (const ParallelConnection::ConnectionLostError&) {
|
||||
LERROR("Parallel connection lost");
|
||||
} catch (const ParallelConnection::ConnectionLostError& e) {
|
||||
if (e.shouldLogError) {
|
||||
LERROR("Parallel connection lost");
|
||||
}
|
||||
}
|
||||
}
|
||||
setStatus(ParallelConnection::Status::Disconnected);
|
||||
@@ -445,12 +474,14 @@ void ParallelPeer::setName(std::string name) {
|
||||
|
||||
void ParallelPeer::requestHostship() {
|
||||
std::vector<char> buffer;
|
||||
uint64_t passwordHash = std::hash<std::string>{}(_hostPassword);
|
||||
std::string hostPw = _hostPassword;
|
||||
uint16_t hostPwSize = static_cast<uint16_t>(hostPw.size());
|
||||
buffer.insert(
|
||||
buffer.end(),
|
||||
reinterpret_cast<char*>(&passwordHash),
|
||||
reinterpret_cast<char*>(&passwordHash) + sizeof(uint64_t)
|
||||
reinterpret_cast<const char*>(&hostPwSize),
|
||||
reinterpret_cast<const char*>(&hostPwSize) + sizeof(uint16_t)
|
||||
);
|
||||
buffer.insert(buffer.end(), hostPw.begin(), hostPw.end());
|
||||
|
||||
_connection.sendMessage(ParallelConnection::Message(
|
||||
ParallelConnection::MessageType::HostshipRequest,
|
||||
@@ -504,7 +535,7 @@ void ParallelPeer::resetTimeOffset() {
|
||||
void ParallelPeer::preSynchronization() {
|
||||
ZoneScoped
|
||||
|
||||
std::unique_lock<std::mutex> unqlock(_receiveBufferMutex);
|
||||
std::unique_lock<std::mutex> unlock(_receiveBufferMutex);
|
||||
while (!_receiveBuffer.empty()) {
|
||||
ParallelConnection::Message& message = _receiveBuffer.front();
|
||||
handleMessage(message);
|
||||
|
||||
@@ -1,426 +0,0 @@
|
||||
/*****************************************************************************************
|
||||
* *
|
||||
* OpenSpace *
|
||||
* *
|
||||
* Copyright (c) 2014-2022 *
|
||||
* *
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy of this *
|
||||
* software and associated documentation files (the "Software"), to deal in the Software *
|
||||
* without restriction, including without limitation the rights to use, copy, modify, *
|
||||
* merge, publish, distribute, sublicense, and/or sell copies of the Software, and to *
|
||||
* permit persons to whom the Software is furnished to do so, subject to the following *
|
||||
* conditions: *
|
||||
* *
|
||||
* The above copyright notice and this permission notice shall be included in all copies *
|
||||
* or substantial portions of the Software. *
|
||||
* *
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, *
|
||||
* INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A *
|
||||
* PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT *
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF *
|
||||
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE *
|
||||
* OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *
|
||||
****************************************************************************************/
|
||||
|
||||
#include <openspace/network/parallelserver.h>
|
||||
|
||||
#include <ghoul/fmt.h>
|
||||
#include <ghoul/io/socket/tcpsocket.h>
|
||||
#include <ghoul/logging/logmanager.h>
|
||||
#include <functional>
|
||||
|
||||
// @TODO(abock): In the entire class remove std::shared_ptr<Peer> by const Peer& where
|
||||
// possible to simplify the interface
|
||||
|
||||
namespace {
|
||||
constexpr const char* _loggerCat = "ParallelServer";
|
||||
} // namespace
|
||||
|
||||
namespace openspace {
|
||||
|
||||
void ParallelServer::start(int port, const std::string& password,
|
||||
const std::string& changeHostPassword)
|
||||
{
|
||||
_socketServer.listen(port);
|
||||
_passwordHash = std::hash<std::string>{}(password);
|
||||
_changeHostPasswordHash = std::hash<std::string>{}(changeHostPassword);
|
||||
|
||||
_serverThread = std::thread([this](){ handleNewPeers(); });
|
||||
_eventLoopThread = std::thread([this]() { eventLoop(); });
|
||||
}
|
||||
|
||||
void ParallelServer::setDefaultHostAddress(std::string defaultHostAddress) {
|
||||
std::lock_guard lock(_hostInfoMutex);
|
||||
_defaultHostAddress = std::move(defaultHostAddress);
|
||||
}
|
||||
|
||||
std::string ParallelServer::defaultHostAddress() const {
|
||||
std::lock_guard lock(_hostInfoMutex);
|
||||
return _defaultHostAddress;
|
||||
}
|
||||
|
||||
void ParallelServer::stop() {
|
||||
_shouldStop = true;
|
||||
_socketServer.close();
|
||||
}
|
||||
|
||||
void ParallelServer::handleNewPeers() {
|
||||
while (!_shouldStop) {
|
||||
std::unique_ptr<ghoul::io::TcpSocket> s = _socketServer.awaitPendingTcpSocket();
|
||||
|
||||
s->startStreams();
|
||||
|
||||
const size_t id = _nextConnectionId++;
|
||||
auto p = std::make_shared<Peer>(Peer{
|
||||
id,
|
||||
"",
|
||||
ParallelConnection(std::move(s)),
|
||||
ParallelConnection::Status::Connecting,
|
||||
std::thread()
|
||||
});
|
||||
auto it = _peers.emplace(p->id, p);
|
||||
it.first->second->thread = std::thread([this, id]() { handlePeer(id); });
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<ParallelServer::Peer> ParallelServer::peer(size_t id) {
|
||||
std::lock_guard lock(_peerListMutex);
|
||||
const auto it = _peers.find(id);
|
||||
if (it == _peers.end()) {
|
||||
return nullptr;
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void ParallelServer::handlePeer(size_t id) {
|
||||
while (!_shouldStop) {
|
||||
std::shared_ptr<Peer> p = peer(id);
|
||||
if (!p) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!p->parallelConnection.isConnectedOrConnecting()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
ParallelConnection::Message m = p->parallelConnection.receiveMessage();
|
||||
PeerMessage msg;
|
||||
msg.peerId = id;
|
||||
msg.message = m;
|
||||
_incomingMessages.push(msg);
|
||||
}
|
||||
catch (const ParallelConnection::ConnectionLostError&) {
|
||||
LERROR(fmt::format("Connection lost to {}", p->id));
|
||||
PeerMessage msg;
|
||||
msg.peerId = id;
|
||||
msg.message = ParallelConnection::Message(
|
||||
ParallelConnection::MessageType::Disconnection, std::vector<char>()
|
||||
);
|
||||
_incomingMessages.push(msg);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelServer::eventLoop() {
|
||||
while (!_shouldStop) {
|
||||
PeerMessage pm = _incomingMessages.pop();
|
||||
handlePeerMessage(std::move(pm));
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelServer::handlePeerMessage(PeerMessage peerMessage) {
|
||||
const size_t peerId = peerMessage.peerId;
|
||||
auto it = _peers.find(peerId);
|
||||
if (it == _peers.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::shared_ptr<Peer>& peer = it->second;
|
||||
|
||||
const ParallelConnection::MessageType type = peerMessage.message.type;
|
||||
std::vector<char>& data = peerMessage.message.content;
|
||||
switch (type) {
|
||||
case ParallelConnection::MessageType::Authentication:
|
||||
handleAuthentication(peer, std::move(data));
|
||||
break;
|
||||
case ParallelConnection::MessageType::Data:
|
||||
handleData(*peer, std::move(data));
|
||||
break;
|
||||
case ParallelConnection::MessageType::HostshipRequest:
|
||||
handleHostshipRequest(peer, std::move(data));
|
||||
break;
|
||||
case ParallelConnection::MessageType::HostshipResignation:
|
||||
handleHostshipResignation(*peer);
|
||||
break;
|
||||
case ParallelConnection::MessageType::Disconnection:
|
||||
disconnect(*peer);
|
||||
break;
|
||||
default:
|
||||
LERROR(fmt::format("Unsupported message type: {}", static_cast<int>(type)));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelServer::handleAuthentication(std::shared_ptr<Peer> peer,
|
||||
std::vector<char> message)
|
||||
{
|
||||
std::stringstream input(std::string(message.begin(), message.end()));
|
||||
|
||||
// 8 bytes passcode
|
||||
uint64_t passwordHash = 0;
|
||||
input.read(reinterpret_cast<char*>(&passwordHash), sizeof(uint64_t));
|
||||
|
||||
if (passwordHash != _passwordHash) {
|
||||
LERROR(fmt::format("Connection {} provided incorrect passcode.", peer->id));
|
||||
disconnect(*peer);
|
||||
return;
|
||||
}
|
||||
|
||||
// 4 bytes name size
|
||||
uint32_t nameSize = 0;
|
||||
input.read(reinterpret_cast<char*>(&nameSize), sizeof(uint32_t));
|
||||
|
||||
// <nameSize> bytes name
|
||||
std::string name(nameSize, static_cast<char>(0));
|
||||
input.read(&name[0], nameSize);
|
||||
|
||||
if (nameSize == 0) {
|
||||
name = "Anonymous";
|
||||
}
|
||||
|
||||
setName(*peer, name);
|
||||
|
||||
LINFO(fmt::format("Connection established with {} ('{}')", peer->id, name));
|
||||
|
||||
std::string defaultHostAddress;
|
||||
{
|
||||
std::lock_guard _hostMutex(_hostInfoMutex);
|
||||
defaultHostAddress = _defaultHostAddress;
|
||||
}
|
||||
if (_hostPeerId == 0 &&
|
||||
peer->parallelConnection.socket()->address() == defaultHostAddress)
|
||||
{
|
||||
// Directly promote the conenction to host (initialize) if there is no host, and
|
||||
// ip matches default host ip.
|
||||
LINFO(fmt::format("Connection {} directly promoted to host", peer->id));
|
||||
assignHost(peer);
|
||||
for (std::pair<const size_t, std::shared_ptr<Peer>>& it : _peers) {
|
||||
// sendConnectionStatus(it->second) ?
|
||||
sendConnectionStatus(*peer);
|
||||
}
|
||||
}
|
||||
else {
|
||||
setToClient(*peer);
|
||||
}
|
||||
|
||||
setNConnections(nConnections() + 1);
|
||||
}
|
||||
|
||||
void ParallelServer::handleData(const Peer& peer, std::vector<char> data) {
|
||||
if (peer.id != _hostPeerId) {
|
||||
LINFO(fmt::format(
|
||||
"Ignoring connection {} trying to send data without being host", peer.id
|
||||
));
|
||||
}
|
||||
sendMessageToClients(ParallelConnection::MessageType::Data, data);
|
||||
}
|
||||
|
||||
void ParallelServer::handleHostshipRequest(std::shared_ptr<Peer> peer,
|
||||
std::vector<char> message)
|
||||
{
|
||||
std::stringstream input(std::string(message.begin(), message.end()));
|
||||
|
||||
LINFO(fmt::format("Connection {} requested hostship", peer->id));
|
||||
|
||||
uint64_t passwordHash = 0;
|
||||
input.read(reinterpret_cast<char*>(&passwordHash), sizeof(uint64_t));
|
||||
|
||||
if (passwordHash != _changeHostPasswordHash) {
|
||||
LERROR(fmt::format("Connection {} provided incorrect host password", peer->id));
|
||||
return;
|
||||
}
|
||||
|
||||
size_t oldHostPeerId = 0;
|
||||
{
|
||||
std::lock_guard lock(_hostInfoMutex);
|
||||
oldHostPeerId = _hostPeerId;
|
||||
}
|
||||
|
||||
if (oldHostPeerId == peer->id) {
|
||||
LINFO(fmt::format("Connection {} is already the host", peer->id));
|
||||
return;
|
||||
}
|
||||
|
||||
assignHost(peer);
|
||||
LINFO(fmt::format("Switched host from {} to {}", oldHostPeerId, peer->id));
|
||||
}
|
||||
|
||||
void ParallelServer::handleHostshipResignation(Peer& peer) {
|
||||
LINFO(fmt::format("Connection {} wants to resign its hostship", peer.id));
|
||||
|
||||
setToClient(peer);
|
||||
|
||||
LINFO(fmt::format("Connection {} resigned as host", peer.id));
|
||||
}
|
||||
|
||||
bool ParallelServer::isConnected(const Peer& peer) const {
|
||||
return peer.status != ParallelConnection::Status::Connecting &&
|
||||
peer.status != ParallelConnection::Status::Disconnected;
|
||||
}
|
||||
|
||||
void ParallelServer::sendMessage(Peer& peer, ParallelConnection::MessageType messageType,
|
||||
const std::vector<char>& message)
|
||||
{
|
||||
peer.parallelConnection.sendMessage({ messageType, message });
|
||||
}
|
||||
|
||||
void ParallelServer::sendMessageToAll(ParallelConnection::MessageType messageType,
|
||||
const std::vector<char>& message)
|
||||
{
|
||||
for (std::pair<const size_t, std::shared_ptr<Peer>>& it : _peers) {
|
||||
if (isConnected(*it.second)) {
|
||||
it.second->parallelConnection.sendMessage({ messageType, message });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelServer::sendMessageToClients(ParallelConnection::MessageType messageType,
|
||||
const std::vector<char>& message)
|
||||
{
|
||||
for (std::pair<const size_t, std::shared_ptr<Peer>>& it : _peers) {
|
||||
if (it.second->status == ParallelConnection::Status::ClientWithHost) {
|
||||
it.second->parallelConnection.sendMessage({ messageType, message });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelServer::disconnect(Peer& peer) {
|
||||
if (isConnected(peer)) {
|
||||
setNConnections(nConnections() - 1);
|
||||
}
|
||||
|
||||
size_t hostPeerId = 0;
|
||||
{
|
||||
std::lock_guard lock(_hostInfoMutex);
|
||||
hostPeerId = _hostPeerId;
|
||||
}
|
||||
|
||||
// Make sure any disconnecting host is first degraded to client, in order to notify
|
||||
// other clients about host disconnection.
|
||||
if (peer.id == hostPeerId) {
|
||||
setToClient(peer);
|
||||
}
|
||||
|
||||
peer.parallelConnection.disconnect();
|
||||
peer.thread.join();
|
||||
_peers.erase(peer.id);
|
||||
}
|
||||
|
||||
void ParallelServer::setName(Peer& peer, std::string name) {
|
||||
peer.name = std::move(name);
|
||||
size_t hostPeerId = 0;
|
||||
{
|
||||
std::lock_guard lock(_hostInfoMutex);
|
||||
hostPeerId = _hostPeerId;
|
||||
}
|
||||
|
||||
// Make sure everyone gets the new host name.
|
||||
if (peer.id == hostPeerId) {
|
||||
{
|
||||
std::lock_guard lock(_hostInfoMutex);
|
||||
_hostName = peer.name;
|
||||
}
|
||||
|
||||
for (std::pair<const size_t, std::shared_ptr<Peer>>& it : _peers) {
|
||||
// sendConnectionStatus(it->second) ?
|
||||
sendConnectionStatus(peer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelServer::assignHost(std::shared_ptr<Peer> newHost) {
|
||||
{
|
||||
std::lock_guard lock(_hostInfoMutex);
|
||||
std::shared_ptr<ParallelServer::Peer> oldHost = peer(_hostPeerId);
|
||||
|
||||
if (oldHost) {
|
||||
oldHost->status = ParallelConnection::Status::ClientWithHost;
|
||||
}
|
||||
_hostPeerId = newHost->id;
|
||||
_hostName = newHost->name;
|
||||
}
|
||||
newHost->status = ParallelConnection::Status::Host;
|
||||
|
||||
for (std::pair<const size_t, std::shared_ptr<Peer>>& it : _peers) {
|
||||
if (it.second != newHost) {
|
||||
it.second->status = ParallelConnection::Status::ClientWithHost;
|
||||
}
|
||||
sendConnectionStatus(*it.second);
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelServer::setToClient(Peer& peer) {
|
||||
if (peer.status == ParallelConnection::Status::Host) {
|
||||
{
|
||||
std::lock_guard lock(_hostInfoMutex);
|
||||
_hostPeerId = 0;
|
||||
_hostName.clear();
|
||||
}
|
||||
|
||||
// If host becomes client, make all clients hostless.
|
||||
for (std::pair<const size_t, std::shared_ptr<Peer>>& it : _peers) {
|
||||
it.second->status = ParallelConnection::Status::ClientWithoutHost;
|
||||
sendConnectionStatus(*it.second);
|
||||
}
|
||||
}
|
||||
else {
|
||||
peer.status = (_hostPeerId > 0) ?
|
||||
ParallelConnection::Status::ClientWithHost :
|
||||
ParallelConnection::Status::ClientWithoutHost;
|
||||
sendConnectionStatus(peer);
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelServer::setNConnections(size_t nConnections) {
|
||||
_nConnections = nConnections;
|
||||
std::vector<char> data;
|
||||
const uint32_t n = static_cast<uint32_t>(_nConnections);
|
||||
data.insert(
|
||||
data.end(),
|
||||
reinterpret_cast<const char*>(&n),
|
||||
reinterpret_cast<const char*>(&n) + sizeof(uint32_t)
|
||||
);
|
||||
sendMessageToAll(ParallelConnection::MessageType::NConnections, data);
|
||||
}
|
||||
|
||||
void ParallelServer::sendConnectionStatus(Peer& peer) {
|
||||
std::vector<char> data;
|
||||
const uint32_t outStatus = static_cast<uint32_t>(peer.status);
|
||||
data.insert(
|
||||
data.end(),
|
||||
reinterpret_cast<const char*>(&outStatus),
|
||||
reinterpret_cast<const char*>(&outStatus) + sizeof(uint32_t)
|
||||
);
|
||||
|
||||
const uint32_t outHostNameSize = static_cast<uint32_t>(_hostName.size());
|
||||
data.insert(
|
||||
data.end(),
|
||||
reinterpret_cast<const char*>(&outHostNameSize),
|
||||
reinterpret_cast<const char*>(&outHostNameSize) + sizeof(uint32_t)
|
||||
);
|
||||
|
||||
data.insert(
|
||||
data.end(),
|
||||
_hostName.data(),
|
||||
_hostName.data() + outHostNameSize
|
||||
);
|
||||
|
||||
sendMessage(peer, ParallelConnection::MessageType::ConnectionStatus, data);
|
||||
}
|
||||
|
||||
size_t ParallelServer::nConnections() const {
|
||||
return _nConnections;
|
||||
}
|
||||
|
||||
} // namespace openspace
|
||||
Reference in New Issue
Block a user