diff --git a/modules/softwareintegration/CMakeLists.txt b/modules/softwareintegration/CMakeLists.txt index abf46d92f3..300776fe63 100644 --- a/modules/softwareintegration/CMakeLists.txt +++ b/modules/softwareintegration/CMakeLists.txt @@ -2,7 +2,7 @@ # # # OpenSpace # # # -# Copyright (c) 2014-2020 # +# Copyright (c) 2014-2022 # # # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # # software and associated documentation files (the "Software"), to deal in the Software # @@ -27,18 +27,26 @@ include(${OPENSPACE_CMAKE_EXT_DIR}/module_definition.cmake) set(HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/pointdatamessagehandler.h ${CMAKE_CURRENT_SOURCE_DIR}/softwareintegrationmodule.h + ${CMAKE_CURRENT_SOURCE_DIR}/network/common/syncablemessagequeue.h + ${CMAKE_CURRENT_SOURCE_DIR}/network/common/basenetworkengine.h ${CMAKE_CURRENT_SOURCE_DIR}/network/softwareconnection.h - ${CMAKE_CURRENT_SOURCE_DIR}/network/softwareintegrationserver.h + ${CMAKE_CURRENT_SOURCE_DIR}/network/networkengine.h + ${CMAKE_CURRENT_SOURCE_DIR}/network/clientnetworkengine.h ${CMAKE_CURRENT_SOURCE_DIR}/rendering/renderablepointscloud.h + ${CMAKE_CURRENT_SOURCE_DIR}/network/common/syncablequeue.h ) source_group("Header Files" FILES ${HEADER_FILES}) set(SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/pointdatamessagehandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/softwareintegrationmodule.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/network/common/syncablemessagequeue.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/network/common/basenetworkengine.cpp ${CMAKE_CURRENT_SOURCE_DIR}/network/softwareconnection.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/network/softwareintegrationserver.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/network/networkengine.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/network/clientnetworkengine.cpp ${CMAKE_CURRENT_SOURCE_DIR}/rendering/renderablepointscloud.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/network/common/syncablequeue.cpp ) source_group("Source Files" FILES ${SOURCE_FILES}) diff --git a/modules/softwareintegration/network/clientnetworkengine.cpp b/modules/softwareintegration/network/clientnetworkengine.cpp new file mode 100644 index 0000000000..02d0abb82e --- /dev/null +++ b/modules/softwareintegration/network/clientnetworkengine.cpp @@ -0,0 +1,80 @@ +/***************************************************************************************** + * * + * OpenSpace * + * * + * Copyright (c) 2014-2022 * + * * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this * + * software and associated documentation files (the "Software"), to deal in the Software * + * without restriction, including without limitation the rights to use, copy, modify, * + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * + * permit persons to whom the Software is furnished to do so, subject to the following * + * conditions: * + * * + * The above copyright notice and this permission notice shall be included in all copies * + * or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * + * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * + ****************************************************************************************/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace { + constexpr const char* _loggerCat = "SoftwareIntegration_ClientNetworkEngine"; +} // namespace + +namespace openspace { + +void ClientNetworkEngine::start() { + BaseNetworkEngine::start(); +} + +void ClientNetworkEngine::stop() { + BaseNetworkEngine::stop(); + + if (_eventLoopThread.joinable()) { + _eventLoopThread.join(); + } +} + +void ClientNetworkEngine::update() { + BaseNetworkEngine::update(); +} + +void ClientNetworkEngine::eventLoop() { + bool shouldShowMessage = true; + bool shouldShowMessage1 = true; + + while (!_shouldStop) { + // if(shouldShowMessage1) { + // LWARNING(fmt::format("On CLIENT")); + // shouldShowMessage1 = false; + // } + + // if (shouldShowMessage && _message.data().content.size() > 0 ) { + // // .type == SoftwareConnection::Message::Type::Color + // shouldShowMessage = false; + // std::string content = { std::begin(_message.data().content), std::end(_message.data().content) }; + // LWARNING(fmt::format("Connected with {}", content)); + // } + } +} + +} // namespace openspace diff --git a/modules/softwareintegration/network/clientnetworkengine.h b/modules/softwareintegration/network/clientnetworkengine.h new file mode 100644 index 0000000000..4f5f7c6001 --- /dev/null +++ b/modules/softwareintegration/network/clientnetworkengine.h @@ -0,0 +1,44 @@ +/***************************************************************************************** + * * + * OpenSpace * + * * + * Copyright (c) 2014-2022 * + * * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this * + * software and associated documentation files (the "Software"), to deal in the Software * + * without restriction, including without limitation the rights to use, copy, modify, * + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * + * permit persons to whom the Software is furnished to do so, subject to the following * + * conditions: * + * * + * The above copyright notice and this permission notice shall be included in all copies * + * or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * + * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * + ****************************************************************************************/ + +#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___CLIENTNETWORKENGINE___H__ +#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___CLIENTNETWORKENGINE___H__ + +#include +#include +#include +#include + +namespace openspace { + +class ClientNetworkEngine : public BaseNetworkEngine { +public: + void start() override; + void stop() override; + void update() override; +}; + +} // namespace openspace + +#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___CLIENTNETWORKENGINE___H__ diff --git a/modules/softwareintegration/network/common/basenetworkengine.cpp b/modules/softwareintegration/network/common/basenetworkengine.cpp new file mode 100644 index 0000000000..4c4f3960f9 --- /dev/null +++ b/modules/softwareintegration/network/common/basenetworkengine.cpp @@ -0,0 +1,155 @@ +/***************************************************************************************** + * * + * OpenSpace * + * * + * Copyright (c) 2014-2022 * + * * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this * + * software and associated documentation files (the "Software"), to deal in the Software * + * without restriction, including without limitation the rights to use, copy, modify, * + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * + * permit persons to whom the Software is furnished to do so, subject to the following * + * conditions: * + * * + * The above copyright notice and this permission notice shall be included in all copies * + * or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * + * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * + ****************************************************************************************/ + +#include + +#include +// #include +// #include +// #include +// #include +#include +// #include +#include +//REMOVE +// #include + +// #include + +namespace { + constexpr const char* _loggerCat = "SoftwareIntegration_BaseNetworkEngine"; +} // namespace + +namespace openspace { + +BaseNetworkEngine::~BaseNetworkEngine() {} + +void BaseNetworkEngine::start() { + global::syncEngine->addSyncables(getSyncables()); + _eventLoopThread = std::thread([this]() { eventLoop(); }); +} + +void BaseNetworkEngine::stop() { + global::syncEngine->removeSyncables(getSyncables()); + _shouldStop = true; + + if (_eventLoopThread.joinable()) { + _eventLoopThread.join(); + } +} + +std::vector BaseNetworkEngine::getSyncables() { + return { &_incomingMessages }; +} + + +// void BaseNetworkEngine::preSync(bool isMaster) { +// // ZoneScoped + +// if (!isMaster) { +// return; +// } + +// std::lock_guard guard(_clientMessageMutex); +// while (!_incomingMessages.empty()) { +// PeerMessage item = std::move(_incomingMessages.front()); +// _incomingMessages.pop(); + +// _messagesToSync.push_back(item); +// // const bool remoteScripting = item.remoteScripting; + +// // Not really a received script but the master also needs to run the script... +// // _masterScriptQueue.push(item); + +// // if (global::parallelPeer->isHost() && remoteScripting) { +// // global::parallelPeer->sendScript(item.script); +// // } +// // if (global::sessionRecording->isRecording()) { +// // global::sessionRecording->saveScriptKeyframeToTimeline(item.script); +// // } +// } +// } + +// void BaseNetworkEngine::encode(SyncBuffer* syncBuffer) { + // ZoneScoped + + // size_t nMessages = _incomingMessages.size(); + // syncBuffer->encode(nScripts); + // for (const std::string& s : _scriptsToSync) { + // syncBuffer->encode(s); + // } + // _scriptsToSync.clear(); +// } + +// void BaseNetworkEngine::decode(SyncBuffer* syncBuffer) { + // ZoneScoped + + // std::lock_guard guard(_slaveScriptsMutex); + // size_t nScripts; + // syncBuffer->decode(nScripts); + + // for (size_t i = 0; i < nScripts; ++i) { + // std::string script; + // syncBuffer->decode(script); + // _slaveScriptQueue.push(std::move(script)); + // } +// } + +// void BaseNetworkEngine::postSync(bool isMaster) { +// // ZoneScoped + +// // if (isMaster) { +// // while (!_masterScriptQueue.empty()) { +// // std::string script = std::move(_masterScriptQueue.front().script); +// // ScriptCallback callback = std::move(_masterScriptQueue.front().callback); +// // _masterScriptQueue.pop(); +// // try { +// // runScript(script, callback); +// // } +// // catch (const ghoul::RuntimeError& e) { +// // LERRORC(e.component, e.message); +// // continue; +// // } +// // } +// // } +// // else { +// // std::lock_guard guard(_slaveScriptsMutex); +// // while (!_slaveScriptQueue.empty()) { +// // try { +// // runScript(_slaveScriptQueue.front()); +// // _slaveScriptQueue.pop(); +// // } +// // catch (const ghoul::RuntimeError& e) { +// // LERRORC(e.component, e.message); +// // } +// // } +// // } +// } + + +// size_t SoftwareIntegrationNodeHandler::nConnections() const { +// return _nConnections; +// } + +} // namespace openspace diff --git a/modules/softwareintegration/network/softwareintegrationserver.h b/modules/softwareintegration/network/common/basenetworkengine.h similarity index 62% rename from modules/softwareintegration/network/softwareintegrationserver.h rename to modules/softwareintegration/network/common/basenetworkengine.h index d4e521e118..5945357740 100644 --- a/modules/softwareintegration/network/softwareintegrationserver.h +++ b/modules/softwareintegration/network/common/basenetworkengine.h @@ -2,7 +2,7 @@ * * * OpenSpace * * * - * Copyright (c) 2014-2021 * + * Copyright (c) 2014-2022 * * * * Permission is hereby granted, free of charge, to any person obtaining a copy of this * * software and associated documentation files (the "Software"), to deal in the Software * @@ -22,65 +22,42 @@ * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * ****************************************************************************************/ -#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SOFTWAREINTEGRATIONSERVER___H__ -#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SOFTWAREINTEGRATIONSERVER___H__ +#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___BASENETWORKENGINE___H__ +#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___BASENETWORKENGINE___H__ #include #include -#include -#include +#include namespace openspace { -class SoftwareIntegrationServer { +class BaseNetworkEngine { public: - struct Peer { - size_t id; - std::string name; - std::thread thread; + virtual ~BaseNetworkEngine() = 0; - SoftwareConnection connection; - SoftwareConnection::Status status; - }; + virtual void start(); + virtual void stop(); + virtual void update(); - struct PeerMessage { - size_t peerId; - SoftwareConnection::Message message; - }; + // size_t nConnections() const; - void start(int port); - void stop(); - void update(); - - size_t nConnections() const; - -private: - bool isConnected(const Peer& peer) const; - - std::shared_ptr peer(size_t id); - - void disconnect(Peer& peer); +protected: void eventLoop(); - void handleNewPeers(); - void handlePeer(size_t id); - void handlePeerMessage(PeerMessage peerMessage); - std::unordered_map> _peers; - mutable std::mutex _peerListMutex; + std::vector getSyncables(); - ghoul::io::TcpSocketServer _socketServer; - size_t _nextConnectionId = 1; std::atomic_bool _shouldStop = false; - std::atomic_size_t _nConnections = 0; std::thread _eventLoopThread; - std::thread _serverThread; - - ConcurrentQueue _incomingMessages; // Message handlers - PointDataMessageHandler _pointDataMessageHandler; + PointDataMessageHandler _pointDataMessageHandler; + + // ConcurrentQueue _incomingMessages; + SyncableQueue _incomingMessages; + // SyncableQueue _incomingMessages; + }; } // namespace openspace -#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SOFTWAREINTEGRATIONSERVER___H__ +#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___BASENETWORKENGINE___H__ diff --git a/modules/softwareintegration/network/common/syncablemessagequeue.cpp b/modules/softwareintegration/network/common/syncablemessagequeue.cpp new file mode 100644 index 0000000000..bddcc68dd5 --- /dev/null +++ b/modules/softwareintegration/network/common/syncablemessagequeue.cpp @@ -0,0 +1,202 @@ +/***************************************************************************************** + * * + * OpenSpace * + * * + * Copyright (c) 2014-2022 * + * * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this * + * software and associated documentation files (the "Software"), to deal in the Software * + * without restriction, including without limitation the rights to use, copy, modify, * + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * + * permit persons to whom the Software is furnished to do so, subject to the following * + * conditions: * + * * + * The above copyright notice and this permission notice shall be included in all copies * + * or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * + * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * + ****************************************************************************************/ + +#include +#include + +#include + + +namespace { + constexpr const char* _loggerCat = "SoftwareIntegration_SyncableMessageQueue"; +} // namespace + +namespace openspace { + +// SyncableMessageQueue::SyncableMessageQueue() { +// _queue.clear(); +// }; + +// void SyncableMessageQueue::preSync(bool isMaster) { +// if (!isMaster) { +// return; +// } + +// std::lock_guard guard(_clientItemMutex); +// while (!_incomingMessages.empty()) { +// PeerMessage item = std::move(_incomingMessages.front()); +// _incomingMessages.pop(); + +// _queueToSync.push_back(item); +// // const bool remoteScripting = item.remoteScripting; + +// // Not really a received script but the master also needs to run the script... +// // _masterScriptQueue.push(item); + +// // if (global::parallelPeer->isHost() && remoteScripting) { +// // global::parallelPeer->sendScript(item.script); +// // } +// // if (global::sessionRecording->isRecording()) { +// // global::sessionRecording->saveScriptKeyframeToTimeline(item.script); +// // } +// } +// } + +void SyncableMessageQueue::encode(SyncBuffer* syncBuffer) { + std::lock_guard guard(_mutex); + + size_t nItems = size(); + syncBuffer->encode(nItems); + + for (auto m : _messagesToSync) { + syncBuffer->encode(static_cast(m.message.type)); + LWARNING(fmt::format("message.type: {}", static_cast(m.message.type))); + size_t nChars = m.message.content.size(); + syncBuffer->encode(nChars); + LWARNING(fmt::format("nChars: {}", nChars)); + std::string s( m.message.content.begin(), m.message.content.end() ); + syncBuffer->encode(s); + // syncBuffer->encode(m.message.content); + // LWARNING(fmt::format("m.message.content: {}", m.message.content)); + } + _messagesToSync.clear(); +} + +void SyncableMessageQueue::decode(SyncBuffer* syncBuffer) { + std::lock_guard guard(_mutex); + + size_t nItems; + syncBuffer->decode(nItems); + + for (size_t i = 0; i < nItems; ++i) { + uint32_t _type; + syncBuffer->decode(_type); + SoftwareConnection::MessageType type { _type }; + + size_t nChars; + std::string content; + // std::vector content; + syncBuffer->decode(nChars); + content.resize(nChars); + syncBuffer->decode(content); + + std::vector _content{ content.begin(), content.end() }; + + PeerMessage item{ 0, SoftwareConnection::Message { type, _content } }; + _queue.push_back(std::move(item)); + } +} + +void SyncableMessageQueue::postSync(bool isMaster) { + if (isMaster) { + if (size() > 0) { + LWARNING(fmt::format("SyncableMessageQueue.size() (MASTER): {}", size())); + } + // while (!_queue.empty()) { + // T item = _queue.front(); + // _queue.pop(); + + // std::string msg = { std::begin(item.data().content), std::end(item.data().content) }; + // LWARNING(fmt::format("Item (MASTER): {}", msg)); + + // // std::string script = std::move(_masterScriptQueue.front().script); + // // ScriptCallback callback = std::move(_masterScriptQueue.front().callback); + // // _masterScriptQueue.pop(); + // // try { + // // runScript(script, callback); + // // } + // // catch (const ghoul::RuntimeError& e) { + // // LERRORC(e.component, e.message); + // // continue; + // // } + // } + } + else { + if (size() > 0) { + LWARNING(fmt::format("SyncableMessageQueue.size() (CLIENT): {}", size())); + } + // std::lock_guard guard(_clientMessageMutex); + // while (!_queue.empty()) { + // T item = _queue.front(); + // _queue.pop(); + + // std::string msg = { std::begin(item.data().content), std::end(item.data().content) }; + // LWARNING(fmt::format("Message (CLIENT): {}", msg)); + // // try { + // // runScript(_slaveScriptQueue.front()); + // // _slaveScriptQueue.pop(); + // // } + // // catch (const ghoul::RuntimeError& e) { + // // LERRORC(e.component, e.message); + // // } + // } + } +} + + +void SyncableMessageQueue::push(PeerMessage &&item) { + _messagesToSync.push_back(item); + _queue.push_back(item); +} + +void SyncableMessageQueue::push(const PeerMessage& item) { + _messagesToSync.push_back(item); + _queue.push_back(item); +} + +PeerMessage SyncableMessageQueue::pop() { + PeerMessage item = front(); + _queue.pop_front(); + return item; +} + +// void SyncableMessageQueue::pop(PeerMessage& item) { +// _queue.pop_front(item); +// } + +size_t SyncableMessageQueue::size() const { + return _queue.size(); +} + +bool SyncableMessageQueue::empty() const { + return _queue.empty(); +} + +PeerMessage& SyncableMessageQueue::front() { + return _queue.front(); +} + +const PeerMessage& SyncableMessageQueue::front() const { + return _queue.front(); +} + +PeerMessage& SyncableMessageQueue::back() { + return _queue.back(); +} + +const PeerMessage& SyncableMessageQueue::back() const { + return _queue.back(); +} + +} // namespace openspace diff --git a/modules/softwareintegration/network/common/syncablemessagequeue.h b/modules/softwareintegration/network/common/syncablemessagequeue.h new file mode 100644 index 0000000000..d161890b28 --- /dev/null +++ b/modules/softwareintegration/network/common/syncablemessagequeue.h @@ -0,0 +1,90 @@ +/***************************************************************************************** + * * + * OpenSpace * + * * + * Copyright (c) 2014-2022 * + * * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this * + * software and associated documentation files (the "Software"), to deal in the Software * + * without restriction, including without limitation the rights to use, copy, modify, * + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * + * permit persons to whom the Software is furnished to do so, subject to the following * + * conditions: * + * * + * The above copyright notice and this permission notice shall be included in all copies * + * or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * + * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * + ****************************************************************************************/ + +#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEMESSAGEQUEUE___H__ +#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEMESSAGEQUEUE___H__ + +#include +#include + +namespace openspace { + +struct PeerMessage { + size_t peerId; + SoftwareConnection::Message message; + + +}; + +/** + * A double buffered implementation of the Syncable interface. + * Users are encouraged to used this class as a default way to synchronize different + * C++ data types using the SyncEngine. + * + * This class aims to handle the synchronization parts and yet act like a regular + * instance of T. Implicit casts are supported, however, when accessing member functions + * or variables, user may have to do explicit casts. + * + * ((T&) t).method(); + * + */ +class SyncableMessageQueue : public Syncable { +public: + /* ============== SyncEngine functions ============== */ + // virtual void preSync(bool isMaster) override; + virtual void encode(SyncBuffer* syncBuffer) override; + virtual void decode(SyncBuffer* syncBuffer) override; + virtual void postSync(bool isMaster) override; + /* ================================================== */ + + /* =============== Utility functions ================ */ + void push(PeerMessage &&item); + void push(const PeerMessage& item); + + PeerMessage pop(); + + size_t size() const; + + [[nodiscard]] bool empty() const; + + PeerMessage& front(); + const PeerMessage& front() const; + + PeerMessage& back(); + const PeerMessage& back() const; + + /* ================================================== */ + +private: + std::mutex _mutex; + std::list _queue; + std::vector _messagesToSync; + + bool showMessageEncode = true; + bool showMessageDecode = true; +}; + +} // namespace openspace + +#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEMESSAGEQUEUE___H__ diff --git a/modules/softwareintegration/network/common/syncablequeue.cpp b/modules/softwareintegration/network/common/syncablequeue.cpp new file mode 100644 index 0000000000..a706fc6408 --- /dev/null +++ b/modules/softwareintegration/network/common/syncablequeue.cpp @@ -0,0 +1,171 @@ +/***************************************************************************************** + * * + * OpenSpace * + * * + * Copyright (c) 2014-2022 * + * * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this * + * software and associated documentation files (the "Software"), to deal in the Software * + * without restriction, including without limitation the rights to use, copy, modify, * + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * + * permit persons to whom the Software is furnished to do so, subject to the following * + * conditions: * + * * + * The above copyright notice and this permission notice shall be included in all copies * + * or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * + * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * + ****************************************************************************************/ + +#include + +#include + +// TODO: REMOVE +#include +#include +#include + + + +namespace { + constexpr const char* _loggerCat = "SoftwareIntegration_SyncableQueue"; +} // namespace + +namespace openspace { + +// template +// SyncableQueue::SyncableQueue() { +// _messages.clear(); +// }; + +// void SyncableQueue::preSync(bool isMaster) { +// if (!isMaster) { +// return; +// } + +// std::lock_guard guard(_clientMessageMutex); +// while (!_incomingMessages.empty()) { +// PeerMessage item = std::move(_incomingMessages.front()); +// _incomingMessages.pop(); + +// _messagesToSync.push_back(item); +// // const bool remoteScripting = item.remoteScripting; + +// // Not really a received script but the master also needs to run the script... +// // _masterScriptQueue.push(item); + +// // if (global::parallelPeer->isHost() && remoteScripting) { +// // global::parallelPeer->sendScript(item.script); +// // } +// // if (global::sessionRecording->isRecording()) { +// // global::sessionRecording->saveScriptKeyframeToTimeline(item.script); +// // } +// } +// } + +template +void SyncableQueue::encode(SyncBuffer* syncBuffer) { + //size_t nMessages = _messages.size(); + //syncBuffer->encode(nMessages); + //for (auto m : _messages) { + // syncBuffer->encode(m); + //} + //// _messages.clear(); +} + +template +void SyncableQueue::decode(SyncBuffer* syncBuffer) { + //std::lock_guard guard(_clientMessageMutex); + //size_t nMessages; + //syncBuffer->decode(nMessages); + + //for (size_t i = 0; i < nMessages; ++i) { + // T message; + // syncBuffer->decode(message); + // _messages.push(std::move(message)); + //} +} + +template +void SyncableQueue::push(T &&item) { + _messages.push(item); +} + +template +void SyncableQueue::push(const T& item) { + _messages.push(item); +} + +template +T SyncableQueue::pop() { + return _messages.pop(); +} + +template +void SyncableQueue::pop(T& item) { + _messages.pop(item); +} + +template +size_t SyncableQueue::size() const { + return _messages.size(); +} + +template +bool SyncableQueue::empty() const { + return _messages.empty(); +} + +// template +// void SyncableQueue::postSync(bool isMaster) { +// if (isMaster) { +// while (!_messages.empty()) { +// T message = _messages.front(); +// _messages.pop(); + +// std::string msg = { std::begin(_message.data().content), std::end(_message.data().content) }; +// LWARNING(fmt::format("Message (MASTER) with {}", msg)); + +// // std::string script = std::move(_masterScriptQueue.front().script); +// // ScriptCallback callback = std::move(_masterScriptQueue.front().callback); +// // _masterScriptQueue.pop(); +// // try { +// // runScript(script, callback); +// // } +// // catch (const ghoul::RuntimeError& e) { +// // LERRORC(e.component, e.message); +// // continue; +// // } +// } +// } +// else { +// std::lock_guard guard(_clientMessageMutex); +// while (!_messages.empty()) { +// T message = _messages.front(); +// _messages.pop(); + +// std::string msg = { std::begin(_message.data().content), std::end(_message.data().content) }; +// LWARNING(fmt::format("Message (CLIENT) with {}", msg)); +// // try { +// // runScript(_slaveScriptQueue.front()); +// // _slaveScriptQueue.pop(); +// // } +// // catch (const ghoul::RuntimeError& e) { +// // LERRORC(e.component, e.message); +// // } +// } +// } +// } + + + // size_t SoftwareIntegrationNodeHandler::nConnections() const { + // return _nConnections; + // } + +} // namespace openspace diff --git a/modules/softwareintegration/network/common/syncablequeue.h b/modules/softwareintegration/network/common/syncablequeue.h new file mode 100644 index 0000000000..c592d7f8df --- /dev/null +++ b/modules/softwareintegration/network/common/syncablequeue.h @@ -0,0 +1,63 @@ +/***************************************************************************************** + * * + * OpenSpace * + * * + * Copyright (c) 2014-2022 * + * * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this * + * software and associated documentation files (the "Software"), to deal in the Software * + * without restriction, including without limitation the rights to use, copy, modify, * + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * + * permit persons to whom the Software is furnished to do so, subject to the following * + * conditions: * + * * + * The above copyright notice and this permission notice shall be included in all copies * + * or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * + * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * + ****************************************************************************************/ + +#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEQUEUE___H__ +#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEQUEUE___H__ + +#include +#include + +#include + +namespace openspace { + +class SyncBuffer; + +template +class SyncableQueue : public Syncable { +public: + // virtual void preSync(bool isMaster) override; + virtual void encode(SyncBuffer* syncBuffer) override; + virtual void decode(SyncBuffer* syncBuffer) override; + // virtual void postSync(bool isMaster) override; + + void push(T &&item); + void push(const T& item); + + T pop(); + void pop(T& item); + + size_t size() const; + + bool empty() const; + +private: + ConcurrentQueue _messages; + + std::mutex _clientMessageMutex; +}; + +} // namespace openspace + +#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEQUEUE___H__ diff --git a/modules/softwareintegration/network/networkengine.cpp b/modules/softwareintegration/network/networkengine.cpp new file mode 100644 index 0000000000..08355b8b68 --- /dev/null +++ b/modules/softwareintegration/network/networkengine.cpp @@ -0,0 +1,253 @@ +/***************************************************************************************** + * * + * OpenSpace * + * * + * Copyright (c) 2014-2022 * + * * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this * + * software and associated documentation files (the "Software"), to deal in the Software * + * without restriction, including without limitation the rights to use, copy, modify, * + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * + * permit persons to whom the Software is furnished to do so, subject to the following * + * conditions: * + * * + * The above copyright notice and this permission notice shall be included in all copies * + * or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * + * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * + ****************************************************************************************/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +//REMOVE + +#include + +namespace { + constexpr const char* _loggerCat = "SoftwareIntegration_NetworkEngine"; +} // namespace + +namespace openspace { +NetworkEngine::NetworkEngine(const int port) + : _port{port} +{} + +void NetworkEngine::start() { + BaseNetworkEngine::start(); + _socketServer.listen(_port); + + _serverThread = std::thread([this]() { handleNewPeers(); }); + _eventLoopThread = std::thread([this]() { eventLoop(); }); // TODO: Move this to BaseNetworkEngine::start(); +} + +void NetworkEngine::stop() { + BaseNetworkEngine::stop(); + + _socketServer.close(); + + if (_serverThread.joinable()) { + _serverThread.join(); + } + if (_eventLoopThread.joinable()) { // TODO: Move this to BaseNetworkEngine::stop(); + _eventLoopThread.join(); + } +} + +void NetworkEngine::update() { + BaseNetworkEngine::update(); +} + +// TODO: Move to BaseNetworkEngine +void NetworkEngine::eventLoop() { + while (!_shouldStop) { + if (!_incomingMessages.empty()) { + PeerMessage pm = _incomingMessages.pop(); + handlePeerMessage(std::move(pm)); + } + } +} + +bool NetworkEngine::isConnected(const Peer& peer) const { + return peer.status != SoftwareConnection::Status::Connecting && + peer.status != SoftwareConnection::Status::Disconnected; +} + +std::shared_ptr NetworkEngine::peer(size_t id) { + std::lock_guard lock(_peerListMutex); + auto it = _peers.find(id); + if (it == _peers.end()) { + return nullptr; + } + return it->second; +} + +void NetworkEngine::disconnect(Peer& peer) { + if (isConnected(peer)) { + _nConnections -= 1; + } + + peer.connection.disconnect(); + peer.thread.join(); + _peers.erase(peer.id); +} + +void NetworkEngine::handleNewPeers() { + while (!_shouldStop) { + std::unique_ptr socket = + _socketServer.awaitPendingTcpSocket(); + + if (!socket) { + return; + } + + socket->startStreams(); + + const size_t id = _nextConnectionId++; + std::shared_ptr p = std::make_shared(Peer{ + id, + "", + std::thread(), + SoftwareConnection(std::move(socket)), + SoftwareConnection::Status::Connecting + }); + auto it = _peers.emplace(p->id, p); + it.first->second->thread = std::thread([this, id]() { + handlePeer(id); + }); + } +} + +void NetworkEngine::handlePeer(size_t id) { + while (!_shouldStop) { + std::shared_ptr p = peer(id); + if (!p) { + return; + } + + if (!p->connection.isConnectedOrConnecting()) { + return; + } + + try { + SoftwareConnection::Message m = p->connection.receiveMessageFromSoftware(); + _incomingMessages.push({ id, m }); + } + catch (const SoftwareConnection::SoftwareConnectionLostError&) { + LERROR(fmt::format("Connection lost to {}", p->id)); + _incomingMessages.push({ + id, + SoftwareConnection::Message( + SoftwareConnection::MessageType::Disconnection, std::vector() + ) + }); + return; + } + } +} + +void NetworkEngine::handlePeerMessage(PeerMessage peerMessage) { + const size_t peerId = peerMessage.peerId; + std::shared_ptr peerPtr = peer(peerId); + + const SoftwareConnection::Message::Type messageType = peerMessage.message.type; + std::vector& message = peerMessage.message.content; + + switch (messageType) { + case SoftwareConnection::Message::Type::Connection: { + const std::string software(message.begin(), message.end()); + LINFO(fmt::format("OpenSpace has connected with {} through socket", software)); + break; + } + case SoftwareConnection::Message::Type::ReadPointData: { + LDEBUG("Message recieved.. Point Data"); + _pointDataMessageHandler.handlePointDataMessage(message, peerPtr->connection); + break; + } + case SoftwareConnection::Message::Type::RemoveSceneGraphNode: { + const std::string identifier(message.begin(), message.end()); + LDEBUG(fmt::format("Message recieved.. Delete SGN: {}", identifier)); + + const std::string currentAnchor = + global::navigationHandler->orbitalNavigator().anchorNode()->identifier(); + + if (currentAnchor == identifier) { + // If the deleted node is the current anchor, first change focus to the Sun + openspace::global::scriptEngine->queueScript( + "openspace.setPropertyValueSingle('NavigationHandler.OrbitalNavigator.Anchor', 'Sun')" + "openspace.setPropertyValueSingle('NavigationHandler.OrbitalNavigator.Aim', '')", + scripting::ScriptEngine::RemoteScripting::Yes + ); + } + openspace::global::scriptEngine->queueScript( + "openspace.removeSceneGraphNode('" + identifier + "');", + scripting::ScriptEngine::RemoteScripting::Yes + ); + LDEBUG(fmt::format("Scene graph node '{}' removed.", identifier)); + break; + } + case SoftwareConnection::Message::Type::Color: { + const std::string colorMessage(message.begin(), message.end()); + LDEBUG(fmt::format("Message recieved.. New Color: {}", colorMessage)); + + _pointDataMessageHandler.handleColorMessage(message); + break; + } + case SoftwareConnection::Message::Type::Opacity: { + const std::string opacityMessage(message.begin(), message.end()); + LDEBUG(fmt::format("Message recieved.. New Opacity: {}", opacityMessage)); + + _pointDataMessageHandler.handleOpacityMessage(message); + break; + } + case SoftwareConnection::Message::Type::Size: { + const std::string sizeMessage(message.begin(), message.end()); + LDEBUG(fmt::format("Message recieved.. New Size: {}", sizeMessage)); + + _pointDataMessageHandler.handlePointSizeMessage(message); + break; + } + case SoftwareConnection::Message::Type::Visibility: { + const std::string visibilityMessage(message.begin(), message.end()); + LDEBUG(fmt::format("Message recieved.. New Visibility: {}", visibilityMessage)); + + _pointDataMessageHandler.handleVisiblityMessage(message); + break; + } + case SoftwareConnection::Message::Type::Disconnection: { + disconnect(*peerPtr); + break; + } + default: + LERROR(fmt::format( + "Unsupported message type: {}", static_cast(messageType) + )); + break; + } +} + +void NetworkEngine::addIncomingMessage(PeerMessage peerMessage) { + _incomingMessages.push(peerMessage); + // std::lock_guard lockMessage(_messageMutex); + // _message = peerMessage.message; + // std::vector content = { 'h', 'e', 'l', 'l', 'o' }; + // _message = SoftwareConnection::Message(SoftwareConnection::Message::mapSIMPTypeToMessageType["UPCO"], content); + // std::string msg = { std::begin(_message.data().content), std::end(_message.data().content) }; + // LWARNING(fmt::format("Connected with {}", msg)); +} + +} // namespace openspace diff --git a/modules/softwareintegration/network/networkengine.h b/modules/softwareintegration/network/networkengine.h new file mode 100644 index 0000000000..ed2d016ee1 --- /dev/null +++ b/modules/softwareintegration/network/networkengine.h @@ -0,0 +1,80 @@ +/***************************************************************************************** + * * + * OpenSpace * + * * + * Copyright (c) 2014-2022 * + * * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this * + * software and associated documentation files (the "Software"), to deal in the Software * + * without restriction, including without limitation the rights to use, copy, modify, * + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * + * permit persons to whom the Software is furnished to do so, subject to the following * + * conditions: * + * * + * The above copyright notice and this permission notice shall be included in all copies * + * or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * + * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * + ****************************************************************************************/ + +#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___NETWORKENGINE___H__ +#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___NETWORKENGINE___H__ + +#include +#include +#include +#include +#include +#include + +namespace openspace { + +class NetworkEngine : public BaseNetworkEngine { +public: + NetworkEngine(const int port = 4700); + + struct Peer { + size_t id; + std::string name; + std::thread thread; + + SoftwareConnection connection; + SoftwareConnection::Status status; + }; + + void start() override; + void stop() override; + void update() override; + +protected: + void disconnect(Peer& peer); + void handleNewPeers(); + void handlePeer(size_t id); + void handlePeerMessage(PeerMessage peerMessage); + +private: + bool isConnected(const Peer& peer) const; + + std::shared_ptr peer(size_t id); + + + std::unordered_map> _peers; + mutable std::mutex _peerListMutex; + + ghoul::io::TcpSocketServer _socketServer; + size_t _nextConnectionId = 1; + std::atomic_size_t _nConnections = 0; + std::thread _serverThread; + std::thread _eventLoopThread; + + const int _port; +}; + +} // namespace openspace + +#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___NETWORKENGINE___H__ diff --git a/modules/softwareintegration/network/softwareconnection.cpp b/modules/softwareintegration/network/softwareconnection.cpp index ba2c9a2476..184815175b 100644 --- a/modules/softwareintegration/network/softwareconnection.cpp +++ b/modules/softwareintegration/network/softwareconnection.cpp @@ -26,6 +26,9 @@ #include #include +#include +#include +#include namespace { constexpr const char* _loggerCat = "SoftwareConnection"; @@ -35,8 +38,19 @@ namespace openspace { const float SoftwareConnection::ProtocolVersion = 1.0; -SoftwareConnection::Message::Message(MessageType type, std::vector content) - : type(type), content(std::move(content)) +std::map SoftwareConnection::mapSIMPTypeToMessageType { + {"CONN", MessageType::Connection}, + {"PDAT", MessageType::ReadPointData}, + {"RSGN", MessageType::RemoveSceneGraphNode}, + {"UPCO", MessageType::Color}, + {"UPOP", MessageType::Opacity}, + {"UPSI", MessageType::Size}, + {"TOVI", MessageType::Visibility}, + {"DISC", MessageType::Disconnection}, +}; + +SoftwareConnection::Message::Message(SoftwareConnection::MessageType type, std::vector content) + : type{ type }, content{ std::move(content) } {} SoftwareConnection::SoftwareConnectionLostError::SoftwareConnectionLostError() @@ -80,16 +94,25 @@ ghoul::io::TcpSocket* SoftwareConnection::socket() { return _socket.get(); } -SoftwareConnection::Message SoftwareConnection::receiveMessage() { +/** + * @brief This function is only called on the server node, i.e. the node connected to the external software + * + * @return SoftwareConnection::Message + */ +SoftwareConnection::Message SoftwareConnection::receiveMessageFromSoftware() { + // LWARNING("SoftwareConnection::receiveMessageFromSoftware()"); + // if (global::windowDelegate->isMaster()) { + // } + // Header consists of version (3 char), message type (4 char) & subject size (9 char) - size_t HeaderSize = 16 * sizeof(char); + size_t headerSize = 16 * sizeof(char); // Create basic buffer for receiving first part of message - std::vector headerBuffer(HeaderSize); + std::vector headerBuffer(headerSize); std::vector subjectBuffer; // Receive the header data - if (!_socket->get(headerBuffer.data(), HeaderSize)) { + if (!_socket->get(headerBuffer.data(), headerSize)) { LERROR("Failed to read header from socket. Disconnecting."); throw SoftwareConnectionLostError(); } @@ -122,7 +145,7 @@ SoftwareConnection::Message SoftwareConnection::receiveMessage() { for (int i = 7; i < 16; i++) { subjectSizeIn.push_back(headerBuffer[i]); } - const size_t subjectSize = stoi(subjectSizeIn); + const size_t subjectSize = std::stoi(subjectSizeIn); // Receive the message data subjectBuffer.resize(subjectSize); @@ -132,33 +155,16 @@ SoftwareConnection::Message SoftwareConnection::receiveMessage() { } // And delegate decoding depending on message type - if (type == "CONN") { - return Message(MessageType::Connection, subjectBuffer); - } - else if (type == "PDAT") { - return Message(MessageType::ReadPointData, subjectBuffer); - } - else if (type == "RSGN") { - return Message(MessageType::RemoveSceneGraphNode, subjectBuffer); - } - else if (type == "UPCO") { - _isListening = false; - return Message(MessageType::Color, subjectBuffer); - } - else if (type == "UPOP") { - _isListening = false; - return Message(MessageType::Opacity, subjectBuffer); - } - else if (type == "UPSI") { - _isListening = false; - return Message(MessageType::Size, subjectBuffer); - } - else if (type == "TOVI") { - _isListening = false; - return Message(MessageType::Visibility, subjectBuffer); - } - else if (type == "DISC") { - return Message(MessageType::Disconnection, subjectBuffer); + if (mapSIMPTypeToMessageType.count(type) != 0) { + if (mapSIMPTypeToMessageType[type] == MessageType::Color + || mapSIMPTypeToMessageType[type] == MessageType::Opacity + || mapSIMPTypeToMessageType[type] == MessageType::Size + || mapSIMPTypeToMessageType[type] == MessageType::Visibility + ) { + _isListening = false; + } + + return Message(mapSIMPTypeToMessageType[type], subjectBuffer); } else { LERROR(fmt::format("Unsupported message type: {}. Disconnecting...", type)); diff --git a/modules/softwareintegration/network/softwareconnection.h b/modules/softwareintegration/network/softwareconnection.h index ffb89b0f27..35910ec079 100644 --- a/modules/softwareintegration/network/softwareconnection.h +++ b/modules/softwareintegration/network/softwareconnection.h @@ -48,6 +48,19 @@ public: Disconnection }; + static std::map mapSIMPTypeToMessageType; + + // struct SIMPMessageType { + // static inline const std::string Connection = "CONN"; + // static inline const std::string ReadPointData = "PDAT"; + // static inline const std::string RemoveSceneGraphNode = "RSGN"; + // static inline const std::string Color = "UPCO"; + // static inline const std::string Opacity = "UPOP"; + // static inline const std::string Size = "UPSI"; + // static inline const std::string Visibility = "TOVI"; + // static inline const std::string Disconnection = "DISC"; + // }; + struct Message { Message() = default; Message(MessageType type, std::vector content); @@ -61,7 +74,6 @@ public: explicit SoftwareConnectionLostError(); }; - SoftwareConnection() = default; SoftwareConnection(std::unique_ptr socket); bool isConnected() const; @@ -71,7 +83,7 @@ public: ghoul::io::TcpSocket* socket(); - SoftwareConnection::Message receiveMessage(); + SoftwareConnection::Message receiveMessageFromSoftware(); static const float ProtocolVersion; diff --git a/modules/softwareintegration/network/softwareintegrationserver.cpp b/modules/softwareintegration/network/softwareintegrationserver.cpp deleted file mode 100644 index c018a22762..0000000000 --- a/modules/softwareintegration/network/softwareintegrationserver.cpp +++ /dev/null @@ -1,235 +0,0 @@ -/***************************************************************************************** - * * - * OpenSpace * - * * - * Copyright (c) 2014-2021 * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this * - * software and associated documentation files (the "Software"), to deal in the Software * - * without restriction, including without limitation the rights to use, copy, modify, * - * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * - * permit persons to whom the Software is furnished to do so, subject to the following * - * conditions: * - * * - * The above copyright notice and this permission notice shall be included in all copies * - * or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * - * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * - * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * - * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * - ****************************************************************************************/ - -#include - -#include -#include -#include -#include -#include -#include - -namespace { - constexpr const char* _loggerCat = "SoftwareIntegrationServer"; -} // namespace - -namespace openspace { - -void SoftwareIntegrationServer::start(int port) { - _socketServer.listen(port); - - _serverThread = std::thread([this]() { handleNewPeers(); }); - _eventLoopThread = std::thread([this]() { eventLoop(); }); -} - -void SoftwareIntegrationServer::stop() { - _shouldStop = true; - _socketServer.close(); - - if (_serverThread.joinable()) { - _serverThread.join(); - } - if (_eventLoopThread.joinable()) { - _eventLoopThread.join(); - } -} - -void SoftwareIntegrationServer::update() { - _pointDataMessageHandler.preSyncUpdate(); -} - -size_t SoftwareIntegrationServer::nConnections() const { - return _nConnections; -} - -bool SoftwareIntegrationServer::isConnected(const Peer& peer) const { - return peer.status != SoftwareConnection::Status::Connecting && - peer.status != SoftwareConnection::Status::Disconnected; -} - -std::shared_ptr -SoftwareIntegrationServer::peer(size_t id) -{ - std::lock_guard lock(_peerListMutex); - auto it = _peers.find(id); - if (it == _peers.end()) { - return nullptr; - } - return it->second; -} - -void SoftwareIntegrationServer::disconnect(Peer& peer) { - if (isConnected(peer)) { - _nConnections -= 1; - } - - peer.connection.disconnect(); - peer.thread.join(); - _peers.erase(peer.id); -} - -void SoftwareIntegrationServer::eventLoop() { - while (!_shouldStop) { - if (!_incomingMessages.empty()) { - PeerMessage pm = _incomingMessages.pop(); - handlePeerMessage(std::move(pm)); - } - } -} - -void SoftwareIntegrationServer::handleNewPeers() { - while (!_shouldStop) { - std::unique_ptr socket = - _socketServer.awaitPendingTcpSocket(); - - if (!socket) { - return; - } - - socket->startStreams(); - - const size_t id = _nextConnectionId++; - std::shared_ptr p = std::make_shared(Peer{ - id, - "", - std::thread(), - SoftwareConnection(std::move(socket)), - SoftwareConnection::Status::Connecting - }); - auto it = _peers.emplace(p->id, p); - it.first->second->thread = std::thread([this, id]() { - handlePeer(id); - }); - } -} - -void SoftwareIntegrationServer::handlePeer(size_t id) { - while (!_shouldStop) { - std::shared_ptr p = peer(id); - if (!p) { - return; - } - - if (!p->connection.isConnectedOrConnecting()) { - return; - } - try { - SoftwareConnection::Message m = p->connection.receiveMessage(); - _incomingMessages.push({ id, m }); - } - catch (const SoftwareConnection::SoftwareConnectionLostError&) { - LERROR(fmt::format("Connection lost to {}", p->id)); - _incomingMessages.push({ - id, - SoftwareConnection::Message( - SoftwareConnection::MessageType::Disconnection, std::vector() - ) - }); - return; - } - } -} - -void SoftwareIntegrationServer::handlePeerMessage(PeerMessage peerMessage) { - const size_t peerId = peerMessage.peerId; - std::shared_ptr peerPtr = peer(peerId); - - const SoftwareConnection::MessageType messageType = peerMessage.message.type; - std::vector& message = peerMessage.message.content; - - switch (messageType) { - case SoftwareConnection::MessageType::Connection: { - const std::string software(message.begin(), message.end()); - LINFO(fmt::format("OpenSpace has connected with {} through socket", software)); - break; - } - case SoftwareConnection::MessageType::ReadPointData: { - LDEBUG("Message recieved.. Point Data"); - _pointDataMessageHandler.handlePointDataMessage(message, peerPtr->connection); - break; - } - case SoftwareConnection::MessageType::RemoveSceneGraphNode: { - const std::string identifier(message.begin(), message.end()); - LDEBUG(fmt::format("Message recieved.. Delete SGN: {}", identifier)); - - const std::string currentAnchor = - global::navigationHandler->orbitalNavigator().anchorNode()->identifier(); - - if (currentAnchor == identifier) { - // If the deleted node is the current anchor, first change focus to the Sun - openspace::global::scriptEngine->queueScript( - "openspace.setPropertyValueSingle('NavigationHandler.OrbitalNavigator.Anchor', 'Sun')" - "openspace.setPropertyValueSingle('NavigationHandler.OrbitalNavigator.Aim', '')", - scripting::ScriptEngine::RemoteScripting::Yes - ); - } - openspace::global::scriptEngine->queueScript( - "openspace.removeSceneGraphNode('" + identifier + "');", - scripting::ScriptEngine::RemoteScripting::Yes - ); - LDEBUG(fmt::format("Scene graph node '{}' removed.", identifier)); - break; - } - case SoftwareConnection::MessageType::Color: { - const std::string colorMessage(message.begin(), message.end()); - LDEBUG(fmt::format("Message recieved.. New Color: {}", colorMessage)); - - _pointDataMessageHandler.handleColorMessage(message); - break; - } - case SoftwareConnection::MessageType::Opacity: { - const std::string opacityMessage(message.begin(), message.end()); - LDEBUG(fmt::format("Message recieved.. New Opacity: {}", opacityMessage)); - - _pointDataMessageHandler.handleOpacityMessage(message); - break; - } - case SoftwareConnection::MessageType::Size: { - const std::string sizeMessage(message.begin(), message.end()); - LDEBUG(fmt::format("Message recieved.. New Size: {}", sizeMessage)); - - _pointDataMessageHandler.handlePointSizeMessage(message); - break; - } - case SoftwareConnection::MessageType::Visibility: { - const std::string visibilityMessage(message.begin(), message.end()); - LDEBUG(fmt::format("Message recieved.. New Visibility: {}", visibilityMessage)); - - _pointDataMessageHandler.handleVisiblityMessage(message); - break; - } - case SoftwareConnection::MessageType::Disconnection: { - disconnect(*peerPtr); - break; - } - default: - LERROR(fmt::format( - "Unsupported message type: {}", static_cast(messageType) - )); - break; - } -} - -} // namespace openspace diff --git a/modules/softwareintegration/rendering/renderablepointscloud.h b/modules/softwareintegration/rendering/renderablepointscloud.h index c87d0dd79c..be39d7332b 100644 --- a/modules/softwareintegration/rendering/renderablepointscloud.h +++ b/modules/softwareintegration/rendering/renderablepointscloud.h @@ -77,7 +77,7 @@ protected: std::optional _dataStorageKey = std::nullopt; - int _nPoints = 0; + int _nPoints = 0; // TODO: CHANGE TO size_t? int _nValuesPerPoint = 0; GLuint _vertexArrayObjectID = 0; diff --git a/modules/softwareintegration/softwareintegrationmodule.cpp b/modules/softwareintegration/softwareintegrationmodule.cpp index 88693ad21a..0a7112d16e 100644 --- a/modules/softwareintegration/softwareintegrationmodule.cpp +++ b/modules/softwareintegration/softwareintegrationmodule.cpp @@ -24,11 +24,15 @@ #include +#include +#include #include #include #include #include #include +#include +#include namespace { constexpr const char* _loggerCat = "SoftwareIntegrationModule"; @@ -36,7 +40,21 @@ namespace { namespace openspace { -SoftwareIntegrationModule::SoftwareIntegrationModule() : OpenSpaceModule(Name) {} +SoftwareIntegrationModule::SoftwareIntegrationModule() : OpenSpaceModule(Name) { + if (global::windowDelegate->isMaster()) { + // The main node will handle all communication with the external software + // and forward it to the client nodes + // 4700, is the defualt port where the tcp socket will be opened to the ext. software + _server = new NetworkEngine(); + } else { + // The client nodes will only communicate with the main node + _server = new ClientNetworkEngine(); + } +} + +SoftwareIntegrationModule::~SoftwareIntegrationModule() { + delete _server; +} void SoftwareIntegrationModule::storeData(const std::string& key, const std::vector data) @@ -65,14 +83,13 @@ void SoftwareIntegrationModule::internalInitialize(const ghoul::Dictionary&) { fRenderable->registerClass("RenderablePointsCloud"); - // Open port - _server.start(4700); + _server->start(); - global::callback::preSync->emplace_back([this]() { _server.update(); }); + global::callback::preSync->emplace_back([this]() { _server->update(); }); } void SoftwareIntegrationModule::internalDeinitialize() { - _server.stop(); + _server->stop(); } std::vector diff --git a/modules/softwareintegration/softwareintegrationmodule.h b/modules/softwareintegration/softwareintegrationmodule.h index d5792619dc..4f77fe71e2 100644 --- a/modules/softwareintegration/softwareintegrationmodule.h +++ b/modules/softwareintegration/softwareintegrationmodule.h @@ -27,7 +27,7 @@ #include -#include +#include #include namespace openspace { @@ -37,6 +37,7 @@ public: constexpr static const char* Name = "SoftwareIntegration"; SoftwareIntegrationModule(); + ~SoftwareIntegrationModule(); void storeData(const std::string& key, const std::vector data); std::vector fetchData(const std::string& key); @@ -47,7 +48,7 @@ private: void internalInitialize(const ghoul::Dictionary&) override; void internalDeinitialize() override; - SoftwareIntegrationServer _server; + BaseNetworkEngine* _server; // Centralized storage for large datasets std::map> _temporaryDataStorage; diff --git a/openspace.cfg b/openspace.cfg index e60f20c895..27cd13e5fb 100644 --- a/openspace.cfg +++ b/openspace.cfg @@ -8,11 +8,12 @@ -- or a dome cluster system -- A regular 1280x720 window -SGCTConfig = sgct.config.single{vsync=false} +--SGCTConfig = sgct.config.single{vsync=false} +SGCTConfig = "${CONFIG}/sci_powerwall_7nodes.json" -- A regular 1920x1080 window -- SGCTConfig = sgct.config.single{1920, 1080} -- A windowed 1920x1080 fullscreen --- SGCTConfig = sgct.config.single{1920, 1080, border=false, windowPos={0,0}} +--SGCTConfig = sgct.config.single{1920, 1080, border=false, windowPos={0,0}} -- A 1k fisheye rendering -- SGCTConfig = sgct.config.fisheye{1024, 1024} -- A 4k fisheye rendering in a 1024x1024 window @@ -239,12 +240,13 @@ PrintEvents = false ShutdownCountdown = 3 ScreenshotUseDate = true -BypassLauncher = false +BypassLauncher = true -- OnScreenTextScaling = "framebuffer" -- PerProfileCache = true -- DisableRenderingOnMaster = true -- DisableInGameConsole = true +DisableInGameConsole = false GlobalRotation = { 0.0, 0.0, 0.0 } MasterRotation = { 0.0, 0.0, 0.0 } @@ -270,4 +272,4 @@ OpenGLDebugContext = { { Type = "Pop group", Source = "Application", Identifier = 0 }, }, -- FilterSeverity = { } -} \ No newline at end of file +} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 78c3043f99..52bc636019 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -390,6 +390,8 @@ set(OPENSPACE_HEADER ${OPENSPACE_BASE_DIR}/include/openspace/util/transformationmanager.h ${OPENSPACE_BASE_DIR}/include/openspace/util/threadpool.h ${OPENSPACE_BASE_DIR}/include/openspace/util/histogram.h + # ${OPENSPACE_BASE_DIR}/include/openspace/util/syncablequeue.h + # ${OPENSPACE_BASE_DIR}/include/openspace/util/syncablequeue.inl ) if (APPLE) diff --git a/src/engine/openspaceengine.cpp b/src/engine/openspaceengine.cpp index 1918a1bc68..72e601dcf1 100644 --- a/src/engine/openspaceengine.cpp +++ b/src/engine/openspaceengine.cpp @@ -1572,6 +1572,8 @@ std::vector OpenSpaceEngine::encode() { ZoneScoped std::vector buffer = global::syncEngine->encodeSyncables(); + + // LWARNING(fmt::format("buffer.size() (OpenSpaceEngine::encode()): {}", buffer.size())); return buffer; } diff --git a/src/engine/syncengine.cpp b/src/engine/syncengine.cpp index b9754106e4..ec77b802ad 100644 --- a/src/engine/syncengine.cpp +++ b/src/engine/syncengine.cpp @@ -28,6 +28,7 @@ #include #include #include +// #include namespace openspace { @@ -44,6 +45,10 @@ std::vector SyncEngine::encodeSyncables() { } std::vector data = _syncBuffer.data(); + + // std::string msg = "_syncBuffer.size() (encodeSyncables):" + std::to_string(_syncBuffer.data().size()); + // LWARNING(msg); + _syncBuffer.reset(); return data; } diff --git a/support/coding/codegen b/support/coding/codegen index b2c8623b81..c6178634f7 160000 --- a/support/coding/codegen +++ b/support/coding/codegen @@ -1 +1 @@ -Subproject commit b2c8623b81005283666ee319b3a972dd2b03c7a0 +Subproject commit c6178634f7e6bcdc62a67af08f315ee384d0227d