diff --git a/modules/softwareintegration/CMakeLists.txt b/modules/softwareintegration/CMakeLists.txt index 300776fe63..d3ce59dea0 100644 --- a/modules/softwareintegration/CMakeLists.txt +++ b/modules/softwareintegration/CMakeLists.txt @@ -27,26 +27,20 @@ include(${OPENSPACE_CMAKE_EXT_DIR}/module_definition.cmake) set(HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/pointdatamessagehandler.h ${CMAKE_CURRENT_SOURCE_DIR}/softwareintegrationmodule.h - ${CMAKE_CURRENT_SOURCE_DIR}/network/common/syncablemessagequeue.h - ${CMAKE_CURRENT_SOURCE_DIR}/network/common/basenetworkengine.h + ${CMAKE_CURRENT_SOURCE_DIR}/syncabledatastorage.h ${CMAKE_CURRENT_SOURCE_DIR}/network/softwareconnection.h ${CMAKE_CURRENT_SOURCE_DIR}/network/networkengine.h - ${CMAKE_CURRENT_SOURCE_DIR}/network/clientnetworkengine.h ${CMAKE_CURRENT_SOURCE_DIR}/rendering/renderablepointscloud.h - ${CMAKE_CURRENT_SOURCE_DIR}/network/common/syncablequeue.h ) source_group("Header Files" FILES ${HEADER_FILES}) set(SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/pointdatamessagehandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/softwareintegrationmodule.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/network/common/syncablemessagequeue.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/network/common/basenetworkengine.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/syncabledatastorage.cpp ${CMAKE_CURRENT_SOURCE_DIR}/network/softwareconnection.cpp ${CMAKE_CURRENT_SOURCE_DIR}/network/networkengine.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/network/clientnetworkengine.cpp ${CMAKE_CURRENT_SOURCE_DIR}/rendering/renderablepointscloud.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/network/common/syncablequeue.cpp ) source_group("Source Files" FILES ${SOURCE_FILES}) diff --git a/modules/softwareintegration/network/clientnetworkengine.cpp b/modules/softwareintegration/network/clientnetworkengine.cpp deleted file mode 100644 index 02d0abb82e..0000000000 --- a/modules/softwareintegration/network/clientnetworkengine.cpp +++ /dev/null @@ -1,80 +0,0 @@ -/***************************************************************************************** - * * - * OpenSpace * - * * - * Copyright (c) 2014-2022 * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this * - * software and associated documentation files (the "Software"), to deal in the Software * - * without restriction, including without limitation the rights to use, copy, modify, * - * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * - * permit persons to whom the Software is furnished to do so, subject to the following * - * conditions: * - * * - * The above copyright notice and this permission notice shall be included in all copies * - * or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * - * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * - * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * - * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * - ****************************************************************************************/ - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -namespace { - constexpr const char* _loggerCat = "SoftwareIntegration_ClientNetworkEngine"; -} // namespace - -namespace openspace { - -void ClientNetworkEngine::start() { - BaseNetworkEngine::start(); -} - -void ClientNetworkEngine::stop() { - BaseNetworkEngine::stop(); - - if (_eventLoopThread.joinable()) { - _eventLoopThread.join(); - } -} - -void ClientNetworkEngine::update() { - BaseNetworkEngine::update(); -} - -void ClientNetworkEngine::eventLoop() { - bool shouldShowMessage = true; - bool shouldShowMessage1 = true; - - while (!_shouldStop) { - // if(shouldShowMessage1) { - // LWARNING(fmt::format("On CLIENT")); - // shouldShowMessage1 = false; - // } - - // if (shouldShowMessage && _message.data().content.size() > 0 ) { - // // .type == SoftwareConnection::Message::Type::Color - // shouldShowMessage = false; - // std::string content = { std::begin(_message.data().content), std::end(_message.data().content) }; - // LWARNING(fmt::format("Connected with {}", content)); - // } - } -} - -} // namespace openspace diff --git a/modules/softwareintegration/network/clientnetworkengine.h b/modules/softwareintegration/network/clientnetworkengine.h deleted file mode 100644 index 4f5f7c6001..0000000000 --- a/modules/softwareintegration/network/clientnetworkengine.h +++ /dev/null @@ -1,44 +0,0 @@ -/***************************************************************************************** - * * - * OpenSpace * - * * - * Copyright (c) 2014-2022 * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this * - * software and associated documentation files (the "Software"), to deal in the Software * - * without restriction, including without limitation the rights to use, copy, modify, * - * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * - * permit persons to whom the Software is furnished to do so, subject to the following * - * conditions: * - * * - * The above copyright notice and this permission notice shall be included in all copies * - * or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * - * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * - * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * - * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * - ****************************************************************************************/ - -#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___CLIENTNETWORKENGINE___H__ -#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___CLIENTNETWORKENGINE___H__ - -#include -#include -#include -#include - -namespace openspace { - -class ClientNetworkEngine : public BaseNetworkEngine { -public: - void start() override; - void stop() override; - void update() override; -}; - -} // namespace openspace - -#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___CLIENTNETWORKENGINE___H__ diff --git a/modules/softwareintegration/network/common/basenetworkengine.cpp b/modules/softwareintegration/network/common/basenetworkengine.cpp deleted file mode 100644 index 4c4f3960f9..0000000000 --- a/modules/softwareintegration/network/common/basenetworkengine.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/***************************************************************************************** - * * - * OpenSpace * - * * - * Copyright (c) 2014-2022 * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this * - * software and associated documentation files (the "Software"), to deal in the Software * - * without restriction, including without limitation the rights to use, copy, modify, * - * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * - * permit persons to whom the Software is furnished to do so, subject to the following * - * conditions: * - * * - * The above copyright notice and this permission notice shall be included in all copies * - * or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * - * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * - * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * - * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * - ****************************************************************************************/ - -#include - -#include -// #include -// #include -// #include -// #include -#include -// #include -#include -//REMOVE -// #include - -// #include - -namespace { - constexpr const char* _loggerCat = "SoftwareIntegration_BaseNetworkEngine"; -} // namespace - -namespace openspace { - -BaseNetworkEngine::~BaseNetworkEngine() {} - -void BaseNetworkEngine::start() { - global::syncEngine->addSyncables(getSyncables()); - _eventLoopThread = std::thread([this]() { eventLoop(); }); -} - -void BaseNetworkEngine::stop() { - global::syncEngine->removeSyncables(getSyncables()); - _shouldStop = true; - - if (_eventLoopThread.joinable()) { - _eventLoopThread.join(); - } -} - -std::vector BaseNetworkEngine::getSyncables() { - return { &_incomingMessages }; -} - - -// void BaseNetworkEngine::preSync(bool isMaster) { -// // ZoneScoped - -// if (!isMaster) { -// return; -// } - -// std::lock_guard guard(_clientMessageMutex); -// while (!_incomingMessages.empty()) { -// PeerMessage item = std::move(_incomingMessages.front()); -// _incomingMessages.pop(); - -// _messagesToSync.push_back(item); -// // const bool remoteScripting = item.remoteScripting; - -// // Not really a received script but the master also needs to run the script... -// // _masterScriptQueue.push(item); - -// // if (global::parallelPeer->isHost() && remoteScripting) { -// // global::parallelPeer->sendScript(item.script); -// // } -// // if (global::sessionRecording->isRecording()) { -// // global::sessionRecording->saveScriptKeyframeToTimeline(item.script); -// // } -// } -// } - -// void BaseNetworkEngine::encode(SyncBuffer* syncBuffer) { - // ZoneScoped - - // size_t nMessages = _incomingMessages.size(); - // syncBuffer->encode(nScripts); - // for (const std::string& s : _scriptsToSync) { - // syncBuffer->encode(s); - // } - // _scriptsToSync.clear(); -// } - -// void BaseNetworkEngine::decode(SyncBuffer* syncBuffer) { - // ZoneScoped - - // std::lock_guard guard(_slaveScriptsMutex); - // size_t nScripts; - // syncBuffer->decode(nScripts); - - // for (size_t i = 0; i < nScripts; ++i) { - // std::string script; - // syncBuffer->decode(script); - // _slaveScriptQueue.push(std::move(script)); - // } -// } - -// void BaseNetworkEngine::postSync(bool isMaster) { -// // ZoneScoped - -// // if (isMaster) { -// // while (!_masterScriptQueue.empty()) { -// // std::string script = std::move(_masterScriptQueue.front().script); -// // ScriptCallback callback = std::move(_masterScriptQueue.front().callback); -// // _masterScriptQueue.pop(); -// // try { -// // runScript(script, callback); -// // } -// // catch (const ghoul::RuntimeError& e) { -// // LERRORC(e.component, e.message); -// // continue; -// // } -// // } -// // } -// // else { -// // std::lock_guard guard(_slaveScriptsMutex); -// // while (!_slaveScriptQueue.empty()) { -// // try { -// // runScript(_slaveScriptQueue.front()); -// // _slaveScriptQueue.pop(); -// // } -// // catch (const ghoul::RuntimeError& e) { -// // LERRORC(e.component, e.message); -// // } -// // } -// // } -// } - - -// size_t SoftwareIntegrationNodeHandler::nConnections() const { -// return _nConnections; -// } - -} // namespace openspace diff --git a/modules/softwareintegration/network/common/basenetworkengine.h b/modules/softwareintegration/network/common/basenetworkengine.h deleted file mode 100644 index 5945357740..0000000000 --- a/modules/softwareintegration/network/common/basenetworkengine.h +++ /dev/null @@ -1,63 +0,0 @@ -/***************************************************************************************** - * * - * OpenSpace * - * * - * Copyright (c) 2014-2022 * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this * - * software and associated documentation files (the "Software"), to deal in the Software * - * without restriction, including without limitation the rights to use, copy, modify, * - * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * - * permit persons to whom the Software is furnished to do so, subject to the following * - * conditions: * - * * - * The above copyright notice and this permission notice shall be included in all copies * - * or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * - * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * - * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * - * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * - ****************************************************************************************/ - -#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___BASENETWORKENGINE___H__ -#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___BASENETWORKENGINE___H__ - -#include -#include -#include - -namespace openspace { - -class BaseNetworkEngine { -public: - virtual ~BaseNetworkEngine() = 0; - - virtual void start(); - virtual void stop(); - virtual void update(); - - // size_t nConnections() const; - -protected: - void eventLoop(); - - std::vector getSyncables(); - - std::atomic_bool _shouldStop = false; - std::thread _eventLoopThread; - - // Message handlers - PointDataMessageHandler _pointDataMessageHandler; - - // ConcurrentQueue _incomingMessages; - SyncableQueue _incomingMessages; - // SyncableQueue _incomingMessages; - -}; - -} // namespace openspace - -#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___BASENETWORKENGINE___H__ diff --git a/modules/softwareintegration/network/common/syncablemessagequeue.cpp b/modules/softwareintegration/network/common/syncablemessagequeue.cpp deleted file mode 100644 index bddcc68dd5..0000000000 --- a/modules/softwareintegration/network/common/syncablemessagequeue.cpp +++ /dev/null @@ -1,202 +0,0 @@ -/***************************************************************************************** - * * - * OpenSpace * - * * - * Copyright (c) 2014-2022 * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this * - * software and associated documentation files (the "Software"), to deal in the Software * - * without restriction, including without limitation the rights to use, copy, modify, * - * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * - * permit persons to whom the Software is furnished to do so, subject to the following * - * conditions: * - * * - * The above copyright notice and this permission notice shall be included in all copies * - * or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * - * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * - * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * - * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * - ****************************************************************************************/ - -#include -#include - -#include - - -namespace { - constexpr const char* _loggerCat = "SoftwareIntegration_SyncableMessageQueue"; -} // namespace - -namespace openspace { - -// SyncableMessageQueue::SyncableMessageQueue() { -// _queue.clear(); -// }; - -// void SyncableMessageQueue::preSync(bool isMaster) { -// if (!isMaster) { -// return; -// } - -// std::lock_guard guard(_clientItemMutex); -// while (!_incomingMessages.empty()) { -// PeerMessage item = std::move(_incomingMessages.front()); -// _incomingMessages.pop(); - -// _queueToSync.push_back(item); -// // const bool remoteScripting = item.remoteScripting; - -// // Not really a received script but the master also needs to run the script... -// // _masterScriptQueue.push(item); - -// // if (global::parallelPeer->isHost() && remoteScripting) { -// // global::parallelPeer->sendScript(item.script); -// // } -// // if (global::sessionRecording->isRecording()) { -// // global::sessionRecording->saveScriptKeyframeToTimeline(item.script); -// // } -// } -// } - -void SyncableMessageQueue::encode(SyncBuffer* syncBuffer) { - std::lock_guard guard(_mutex); - - size_t nItems = size(); - syncBuffer->encode(nItems); - - for (auto m : _messagesToSync) { - syncBuffer->encode(static_cast(m.message.type)); - LWARNING(fmt::format("message.type: {}", static_cast(m.message.type))); - size_t nChars = m.message.content.size(); - syncBuffer->encode(nChars); - LWARNING(fmt::format("nChars: {}", nChars)); - std::string s( m.message.content.begin(), m.message.content.end() ); - syncBuffer->encode(s); - // syncBuffer->encode(m.message.content); - // LWARNING(fmt::format("m.message.content: {}", m.message.content)); - } - _messagesToSync.clear(); -} - -void SyncableMessageQueue::decode(SyncBuffer* syncBuffer) { - std::lock_guard guard(_mutex); - - size_t nItems; - syncBuffer->decode(nItems); - - for (size_t i = 0; i < nItems; ++i) { - uint32_t _type; - syncBuffer->decode(_type); - SoftwareConnection::MessageType type { _type }; - - size_t nChars; - std::string content; - // std::vector content; - syncBuffer->decode(nChars); - content.resize(nChars); - syncBuffer->decode(content); - - std::vector _content{ content.begin(), content.end() }; - - PeerMessage item{ 0, SoftwareConnection::Message { type, _content } }; - _queue.push_back(std::move(item)); - } -} - -void SyncableMessageQueue::postSync(bool isMaster) { - if (isMaster) { - if (size() > 0) { - LWARNING(fmt::format("SyncableMessageQueue.size() (MASTER): {}", size())); - } - // while (!_queue.empty()) { - // T item = _queue.front(); - // _queue.pop(); - - // std::string msg = { std::begin(item.data().content), std::end(item.data().content) }; - // LWARNING(fmt::format("Item (MASTER): {}", msg)); - - // // std::string script = std::move(_masterScriptQueue.front().script); - // // ScriptCallback callback = std::move(_masterScriptQueue.front().callback); - // // _masterScriptQueue.pop(); - // // try { - // // runScript(script, callback); - // // } - // // catch (const ghoul::RuntimeError& e) { - // // LERRORC(e.component, e.message); - // // continue; - // // } - // } - } - else { - if (size() > 0) { - LWARNING(fmt::format("SyncableMessageQueue.size() (CLIENT): {}", size())); - } - // std::lock_guard guard(_clientMessageMutex); - // while (!_queue.empty()) { - // T item = _queue.front(); - // _queue.pop(); - - // std::string msg = { std::begin(item.data().content), std::end(item.data().content) }; - // LWARNING(fmt::format("Message (CLIENT): {}", msg)); - // // try { - // // runScript(_slaveScriptQueue.front()); - // // _slaveScriptQueue.pop(); - // // } - // // catch (const ghoul::RuntimeError& e) { - // // LERRORC(e.component, e.message); - // // } - // } - } -} - - -void SyncableMessageQueue::push(PeerMessage &&item) { - _messagesToSync.push_back(item); - _queue.push_back(item); -} - -void SyncableMessageQueue::push(const PeerMessage& item) { - _messagesToSync.push_back(item); - _queue.push_back(item); -} - -PeerMessage SyncableMessageQueue::pop() { - PeerMessage item = front(); - _queue.pop_front(); - return item; -} - -// void SyncableMessageQueue::pop(PeerMessage& item) { -// _queue.pop_front(item); -// } - -size_t SyncableMessageQueue::size() const { - return _queue.size(); -} - -bool SyncableMessageQueue::empty() const { - return _queue.empty(); -} - -PeerMessage& SyncableMessageQueue::front() { - return _queue.front(); -} - -const PeerMessage& SyncableMessageQueue::front() const { - return _queue.front(); -} - -PeerMessage& SyncableMessageQueue::back() { - return _queue.back(); -} - -const PeerMessage& SyncableMessageQueue::back() const { - return _queue.back(); -} - -} // namespace openspace diff --git a/modules/softwareintegration/network/common/syncablequeue.cpp b/modules/softwareintegration/network/common/syncablequeue.cpp deleted file mode 100644 index a706fc6408..0000000000 --- a/modules/softwareintegration/network/common/syncablequeue.cpp +++ /dev/null @@ -1,171 +0,0 @@ -/***************************************************************************************** - * * - * OpenSpace * - * * - * Copyright (c) 2014-2022 * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this * - * software and associated documentation files (the "Software"), to deal in the Software * - * without restriction, including without limitation the rights to use, copy, modify, * - * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * - * permit persons to whom the Software is furnished to do so, subject to the following * - * conditions: * - * * - * The above copyright notice and this permission notice shall be included in all copies * - * or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * - * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * - * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * - * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * - ****************************************************************************************/ - -#include - -#include - -// TODO: REMOVE -#include -#include -#include - - - -namespace { - constexpr const char* _loggerCat = "SoftwareIntegration_SyncableQueue"; -} // namespace - -namespace openspace { - -// template -// SyncableQueue::SyncableQueue() { -// _messages.clear(); -// }; - -// void SyncableQueue::preSync(bool isMaster) { -// if (!isMaster) { -// return; -// } - -// std::lock_guard guard(_clientMessageMutex); -// while (!_incomingMessages.empty()) { -// PeerMessage item = std::move(_incomingMessages.front()); -// _incomingMessages.pop(); - -// _messagesToSync.push_back(item); -// // const bool remoteScripting = item.remoteScripting; - -// // Not really a received script but the master also needs to run the script... -// // _masterScriptQueue.push(item); - -// // if (global::parallelPeer->isHost() && remoteScripting) { -// // global::parallelPeer->sendScript(item.script); -// // } -// // if (global::sessionRecording->isRecording()) { -// // global::sessionRecording->saveScriptKeyframeToTimeline(item.script); -// // } -// } -// } - -template -void SyncableQueue::encode(SyncBuffer* syncBuffer) { - //size_t nMessages = _messages.size(); - //syncBuffer->encode(nMessages); - //for (auto m : _messages) { - // syncBuffer->encode(m); - //} - //// _messages.clear(); -} - -template -void SyncableQueue::decode(SyncBuffer* syncBuffer) { - //std::lock_guard guard(_clientMessageMutex); - //size_t nMessages; - //syncBuffer->decode(nMessages); - - //for (size_t i = 0; i < nMessages; ++i) { - // T message; - // syncBuffer->decode(message); - // _messages.push(std::move(message)); - //} -} - -template -void SyncableQueue::push(T &&item) { - _messages.push(item); -} - -template -void SyncableQueue::push(const T& item) { - _messages.push(item); -} - -template -T SyncableQueue::pop() { - return _messages.pop(); -} - -template -void SyncableQueue::pop(T& item) { - _messages.pop(item); -} - -template -size_t SyncableQueue::size() const { - return _messages.size(); -} - -template -bool SyncableQueue::empty() const { - return _messages.empty(); -} - -// template -// void SyncableQueue::postSync(bool isMaster) { -// if (isMaster) { -// while (!_messages.empty()) { -// T message = _messages.front(); -// _messages.pop(); - -// std::string msg = { std::begin(_message.data().content), std::end(_message.data().content) }; -// LWARNING(fmt::format("Message (MASTER) with {}", msg)); - -// // std::string script = std::move(_masterScriptQueue.front().script); -// // ScriptCallback callback = std::move(_masterScriptQueue.front().callback); -// // _masterScriptQueue.pop(); -// // try { -// // runScript(script, callback); -// // } -// // catch (const ghoul::RuntimeError& e) { -// // LERRORC(e.component, e.message); -// // continue; -// // } -// } -// } -// else { -// std::lock_guard guard(_clientMessageMutex); -// while (!_messages.empty()) { -// T message = _messages.front(); -// _messages.pop(); - -// std::string msg = { std::begin(_message.data().content), std::end(_message.data().content) }; -// LWARNING(fmt::format("Message (CLIENT) with {}", msg)); -// // try { -// // runScript(_slaveScriptQueue.front()); -// // _slaveScriptQueue.pop(); -// // } -// // catch (const ghoul::RuntimeError& e) { -// // LERRORC(e.component, e.message); -// // } -// } -// } -// } - - - // size_t SoftwareIntegrationNodeHandler::nConnections() const { - // return _nConnections; - // } - -} // namespace openspace diff --git a/modules/softwareintegration/network/common/syncablequeue.h b/modules/softwareintegration/network/common/syncablequeue.h deleted file mode 100644 index c592d7f8df..0000000000 --- a/modules/softwareintegration/network/common/syncablequeue.h +++ /dev/null @@ -1,63 +0,0 @@ -/***************************************************************************************** - * * - * OpenSpace * - * * - * Copyright (c) 2014-2022 * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this * - * software and associated documentation files (the "Software"), to deal in the Software * - * without restriction, including without limitation the rights to use, copy, modify, * - * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * - * permit persons to whom the Software is furnished to do so, subject to the following * - * conditions: * - * * - * The above copyright notice and this permission notice shall be included in all copies * - * or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * - * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * - * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * - * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * - ****************************************************************************************/ - -#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEQUEUE___H__ -#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEQUEUE___H__ - -#include -#include - -#include - -namespace openspace { - -class SyncBuffer; - -template -class SyncableQueue : public Syncable { -public: - // virtual void preSync(bool isMaster) override; - virtual void encode(SyncBuffer* syncBuffer) override; - virtual void decode(SyncBuffer* syncBuffer) override; - // virtual void postSync(bool isMaster) override; - - void push(T &&item); - void push(const T& item); - - T pop(); - void pop(T& item); - - size_t size() const; - - bool empty() const; - -private: - ConcurrentQueue _messages; - - std::mutex _clientMessageMutex; -}; - -} // namespace openspace - -#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEQUEUE___H__ diff --git a/modules/softwareintegration/network/networkengine.cpp b/modules/softwareintegration/network/networkengine.cpp index 08355b8b68..ec349c32a0 100644 --- a/modules/softwareintegration/network/networkengine.cpp +++ b/modules/softwareintegration/network/networkengine.cpp @@ -48,38 +48,26 @@ NetworkEngine::NetworkEngine(const int port) {} void NetworkEngine::start() { - BaseNetworkEngine::start(); _socketServer.listen(_port); _serverThread = std::thread([this]() { handleNewPeers(); }); - _eventLoopThread = std::thread([this]() { eventLoop(); }); // TODO: Move this to BaseNetworkEngine::start(); + _eventLoopThread = std::thread([this]() { eventLoop(); }); } void NetworkEngine::stop() { - BaseNetworkEngine::stop(); - + _shouldStop = true; _socketServer.close(); if (_serverThread.joinable()) { _serverThread.join(); } - if (_eventLoopThread.joinable()) { // TODO: Move this to BaseNetworkEngine::stop(); + if (_eventLoopThread.joinable()) { _eventLoopThread.join(); } } void NetworkEngine::update() { - BaseNetworkEngine::update(); -} - -// TODO: Move to BaseNetworkEngine -void NetworkEngine::eventLoop() { - while (!_shouldStop) { - if (!_incomingMessages.empty()) { - PeerMessage pm = _incomingMessages.pop(); - handlePeerMessage(std::move(pm)); - } - } + _pointDataMessageHandler.preSyncUpdate(); } bool NetworkEngine::isConnected(const Peer& peer) const { @@ -106,6 +94,15 @@ void NetworkEngine::disconnect(Peer& peer) { _peers.erase(peer.id); } +void NetworkEngine::eventLoop() { + while (!_shouldStop) { + if (!_incomingMessages.empty()) { + auto pm = _incomingMessages.pop(); + handlePeerMessage(std::move(pm)); + } + } +} + void NetworkEngine::handleNewPeers() { while (!_shouldStop) { std::unique_ptr socket = @@ -164,21 +161,21 @@ void NetworkEngine::handlePeerMessage(PeerMessage peerMessage) { const size_t peerId = peerMessage.peerId; std::shared_ptr peerPtr = peer(peerId); - const SoftwareConnection::Message::Type messageType = peerMessage.message.type; + const SoftwareConnection::MessageType messageType = peerMessage.message.type; std::vector& message = peerMessage.message.content; switch (messageType) { - case SoftwareConnection::Message::Type::Connection: { + case SoftwareConnection::MessageType::Connection: { const std::string software(message.begin(), message.end()); LINFO(fmt::format("OpenSpace has connected with {} through socket", software)); break; } - case SoftwareConnection::Message::Type::ReadPointData: { + case SoftwareConnection::MessageType::ReadPointData: { LDEBUG("Message recieved.. Point Data"); _pointDataMessageHandler.handlePointDataMessage(message, peerPtr->connection); break; } - case SoftwareConnection::Message::Type::RemoveSceneGraphNode: { + case SoftwareConnection::MessageType::RemoveSceneGraphNode: { const std::string identifier(message.begin(), message.end()); LDEBUG(fmt::format("Message recieved.. Delete SGN: {}", identifier)); @@ -200,35 +197,35 @@ void NetworkEngine::handlePeerMessage(PeerMessage peerMessage) { LDEBUG(fmt::format("Scene graph node '{}' removed.", identifier)); break; } - case SoftwareConnection::Message::Type::Color: { + case SoftwareConnection::MessageType::Color: { const std::string colorMessage(message.begin(), message.end()); LDEBUG(fmt::format("Message recieved.. New Color: {}", colorMessage)); _pointDataMessageHandler.handleColorMessage(message); break; } - case SoftwareConnection::Message::Type::Opacity: { + case SoftwareConnection::MessageType::Opacity: { const std::string opacityMessage(message.begin(), message.end()); LDEBUG(fmt::format("Message recieved.. New Opacity: {}", opacityMessage)); _pointDataMessageHandler.handleOpacityMessage(message); break; } - case SoftwareConnection::Message::Type::Size: { + case SoftwareConnection::MessageType::Size: { const std::string sizeMessage(message.begin(), message.end()); LDEBUG(fmt::format("Message recieved.. New Size: {}", sizeMessage)); _pointDataMessageHandler.handlePointSizeMessage(message); break; } - case SoftwareConnection::Message::Type::Visibility: { + case SoftwareConnection::MessageType::Visibility: { const std::string visibilityMessage(message.begin(), message.end()); LDEBUG(fmt::format("Message recieved.. New Visibility: {}", visibilityMessage)); _pointDataMessageHandler.handleVisiblityMessage(message); break; } - case SoftwareConnection::Message::Type::Disconnection: { + case SoftwareConnection::MessageType::Disconnection: { disconnect(*peerPtr); break; } @@ -240,14 +237,4 @@ void NetworkEngine::handlePeerMessage(PeerMessage peerMessage) { } } -void NetworkEngine::addIncomingMessage(PeerMessage peerMessage) { - _incomingMessages.push(peerMessage); - // std::lock_guard lockMessage(_messageMutex); - // _message = peerMessage.message; - // std::vector content = { 'h', 'e', 'l', 'l', 'o' }; - // _message = SoftwareConnection::Message(SoftwareConnection::Message::mapSIMPTypeToMessageType["UPCO"], content); - // std::string msg = { std::begin(_message.data().content), std::end(_message.data().content) }; - // LWARNING(fmt::format("Connected with {}", msg)); -} - } // namespace openspace diff --git a/modules/softwareintegration/network/networkengine.h b/modules/softwareintegration/network/networkengine.h index ed2d016ee1..c1de466c62 100644 --- a/modules/softwareintegration/network/networkengine.h +++ b/modules/softwareintegration/network/networkengine.h @@ -25,16 +25,14 @@ #ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___NETWORKENGINE___H__ #define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___NETWORKENGINE___H__ -#include #include #include #include #include -#include namespace openspace { -class NetworkEngine : public BaseNetworkEngine { +class NetworkEngine { public: NetworkEngine(const int port = 4700); @@ -47,9 +45,14 @@ public: SoftwareConnection::Status status; }; - void start() override; - void stop() override; - void update() override; + struct PeerMessage { + size_t peerId; + SoftwareConnection::Message message; + }; + + void start(); + void stop(); + void update(); protected: void disconnect(Peer& peer); @@ -58,11 +61,12 @@ protected: void handlePeerMessage(PeerMessage peerMessage); private: + void eventLoop(); + bool isConnected(const Peer& peer) const; std::shared_ptr peer(size_t id); - std::unordered_map> _peers; mutable std::mutex _peerListMutex; @@ -71,8 +75,15 @@ private: std::atomic_size_t _nConnections = 0; std::thread _serverThread; std::thread _eventLoopThread; + + std::atomic_bool _shouldStop = false; const int _port; + + // Message handlers + PointDataMessageHandler _pointDataMessageHandler; + + ConcurrentQueue _incomingMessages; }; } // namespace openspace diff --git a/modules/softwareintegration/pointdatamessagehandler.cpp b/modules/softwareintegration/pointdatamessagehandler.cpp index 9c1f4c9400..d91d677cef 100644 --- a/modules/softwareintegration/pointdatamessagehandler.cpp +++ b/modules/softwareintegration/pointdatamessagehandler.cpp @@ -39,8 +39,6 @@ namespace { constexpr const char* _loggerCat = "SoftwareIntegration"; - - constexpr const int LargeDatasetThreshold = 5000; } // namespace namespace openspace { @@ -93,47 +91,26 @@ void PointDataMessageHandler::handlePointDataMessage(const std::vector& me renderable.setValue("Opacity", static_cast(opacity)); renderable.setValue("Size", static_cast(size)); - if (nPoints > LargeDatasetThreshold) { - // If huge number of points, use the module's temporary data storage - const int nValuesPerPoint = 3; - const int nValues = nPoints * nValuesPerPoint; + const int nValues = nPoints * 3; - std::vector data; - data.reserve(nValues); - for (int i = 0; i < nPoints; i++) { - float x = xCoordinates[i]; - float y = yCoordinates[i]; - float z = zCoordinates[i]; - data.insert(data.end(), { x, y, z }); - } - - // Use the renderable identifier as the data key - const std::string key = identifier; - auto module = global::moduleEngine->module(); - module->storeData(key, std::move(data)); - - renderable.setValue("DataStorageKey", key); + std::vector dataSet; + dataSet.reserve(nValues); + for (int i = 0; i < nPoints; i++) + { + float x = xCoordinates[i]; + float y = yCoordinates[i]; + float z = zCoordinates[i]; + dataSet.insert(dataSet.end(), { x, y, z }); } - else { - ghoul::Dictionary pointDataDictonary; - for (int i = 0; i < nPoints; i++) { - float x = xCoordinates[i]; - float y = yCoordinates[i]; - float z = zCoordinates[i]; - glm::dvec3 point{ x, y, z }; - const std::string key = fmt::format("[{}]", i + 1); + // Use the renderable identifier as the data key + const std::string key = identifier; + auto module = global::moduleEngine->module(); + module->storeData(key, std::move(dataSet)); + + renderable.setValue("DataStorageKey", key); - // Avoid passing nan values through dictionary - if (glm::any(glm::isnan(point))) { - point = glm::dvec3(0.0); - // @TODO Keep track of invalid indices? - } - pointDataDictonary.setValue(key, std::move(point)); - } - renderable.setValue("Data", pointDataDictonary); - } ghoul::Dictionary gui; gui.setValue("Name", guiName); diff --git a/modules/softwareintegration/softwareintegrationmodule.cpp b/modules/softwareintegration/softwareintegrationmodule.cpp index 0a7112d16e..eda6afc5f9 100644 --- a/modules/softwareintegration/softwareintegrationmodule.cpp +++ b/modules/softwareintegration/softwareintegrationmodule.cpp @@ -24,8 +24,8 @@ #include -#include -#include +#include +#include #include #include #include @@ -42,29 +42,28 @@ namespace openspace { SoftwareIntegrationModule::SoftwareIntegrationModule() : OpenSpaceModule(Name) { if (global::windowDelegate->isMaster()) { - // The main node will handle all communication with the external software - // and forward it to the client nodes + // The Master node will handle all communication with the external software + // and forward it to the Client nodes // 4700, is the defualt port where the tcp socket will be opened to the ext. software _server = new NetworkEngine(); - } else { - // The client nodes will only communicate with the main node - _server = new ClientNetworkEngine(); } } SoftwareIntegrationModule::~SoftwareIntegrationModule() { - delete _server; + if (global::windowDelegate->isMaster()) { + delete _server; + } } void SoftwareIntegrationModule::storeData(const std::string& key, const std::vector data) { - _temporaryDataStorage.emplace(key, std::move(data)); + _syncableDataStorage.emplace(key, std::move(data)); } std::vector SoftwareIntegrationModule::fetchData(const std::string& key) { - auto it = _temporaryDataStorage.find(key); - if (it == _temporaryDataStorage.end()) { + auto it = _syncableDataStorage.find(key); + if (it == _syncableDataStorage.end()) { LERROR(fmt::format( "Could not find data with key '{}' in the temporary data storage", key )); @@ -72,24 +71,30 @@ std::vector SoftwareIntegrationModule::fetchData(const std::string& key) } std::vector data = it->second; - _temporaryDataStorage.erase(it); + _syncableDataStorage.erase(it); return std::move(data); } void SoftwareIntegrationModule::internalInitialize(const ghoul::Dictionary&) { + global::syncEngine->addSyncables(getSyncables()); auto fRenderable = FactoryManager::ref().factory(); ghoul_assert(fRenderable, "No renderable factory existed"); fRenderable->registerClass("RenderablePointsCloud"); - _server->start(); + if (global::windowDelegate->isMaster()) { + _server->start(); - global::callback::preSync->emplace_back([this]() { _server->update(); }); + global::callback::preSync->emplace_back([this]() { _server->update(); }); + } } void SoftwareIntegrationModule::internalDeinitialize() { - _server->stop(); + global::syncEngine->removeSyncables(getSyncables()); + if (global::windowDelegate->isMaster()) { + _server->stop(); + } } std::vector @@ -100,4 +105,8 @@ SoftwareIntegrationModule::documentations() const }; } +std::vector SoftwareIntegrationModule::getSyncables() { + return { &_syncableDataStorage }; +} + } // namespace openspace diff --git a/modules/softwareintegration/softwareintegrationmodule.h b/modules/softwareintegration/softwareintegrationmodule.h index 4f77fe71e2..64f723b896 100644 --- a/modules/softwareintegration/softwareintegrationmodule.h +++ b/modules/softwareintegration/softwareintegrationmodule.h @@ -27,7 +27,8 @@ #include -#include +#include +#include #include namespace openspace { @@ -47,11 +48,13 @@ public: private: void internalInitialize(const ghoul::Dictionary&) override; void internalDeinitialize() override; + + std::vector getSyncables(); - BaseNetworkEngine* _server; + NetworkEngine* _server; - // Centralized storage for large datasets - std::map> _temporaryDataStorage; + // Centralized storage for datasets + SyncableDataStorage _syncableDataStorage; }; } // namespace openspace diff --git a/modules/softwareintegration/syncabledatastorage.cpp b/modules/softwareintegration/syncabledatastorage.cpp new file mode 100644 index 0000000000..d425d1ddef --- /dev/null +++ b/modules/softwareintegration/syncabledatastorage.cpp @@ -0,0 +1,140 @@ +/***************************************************************************************** + * * + * OpenSpace * + * * + * Copyright (c) 2014-2022 * + * * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this * + * software and associated documentation files (the "Software"), to deal in the Software * + * without restriction, including without limitation the rights to use, copy, modify, * + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * + * permit persons to whom the Software is furnished to do so, subject to the following * + * conditions: * + * * + * The above copyright notice and this permission notice shall be included in all copies * + * or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * + * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * + ****************************************************************************************/ + +#include +#include + +#include + + +namespace { + constexpr const char* _loggerCat = "SoftwareIntegration_SyncableDataStorage"; +} // namespace + +namespace openspace { + +/* ============== SyncEngine functions ============== */ +void SyncableDataStorage::encode(SyncBuffer* syncBuffer) { + std::lock_guard guard(_mutex); + + size_t nDatasets = _storage.size(); + syncBuffer->encode(nDatasets); + + for (const auto& [key, points] : _storage) { + syncBuffer->encode(key); + + // Go trough all points. Structured as "x, y, z, x, y, z, x, y, ..." + size_t nPoints = points.size(); + syncBuffer->encode(nPoints); + for (auto value : points) { + syncBuffer->encode(value); + } + } +} + +void SyncableDataStorage::decode(SyncBuffer* syncBuffer) { + std::lock_guard guard(_mutex); + + size_t nDatasets; + syncBuffer->decode(nDatasets); + + for (size_t i = 0; i < nDatasets; ++i) { + std::string key; + syncBuffer->decode(key); + + size_t nPoints; + syncBuffer->decode(nPoints); + + // TODO: Change to a glm::fvec3 so we can use an overload + // of decode(glm::fvec3) instead of using a for-loop over floats? + std::vector points; + points.reserve(nPoints); + for (size_t j = 0; j < nPoints; ++j) { + float value; + syncBuffer->decode(value); + points.push_back(value); + } + + _storage[key] = points; + } +} + +void SyncableDataStorage::postSync(bool isMaster) { + if (isMaster) { + if (_storage.size() > 0) { + LWARNING(fmt::format("SyncableDataStorage.size() (MASTER): {}", _storage.size())); + } + } + else { + if (_storage.size() > 0) { + LWARNING(fmt::format("SyncableDataStorage.size() (CLIENT): {}", _storage.size())); + } + } +} +/* ================================================== */ + +/* =============== Utility functions ================ */ +SyncableDataStorage::Iterator SyncableDataStorage::erase(SyncableDataStorage::Iterator pos) { + return _storage.erase(pos); +} +SyncableDataStorage::Iterator SyncableDataStorage::erase(const SyncableDataStorage::Iterator first, const SyncableDataStorage::Iterator last) { + return _storage.erase(first, last); +} +size_t SyncableDataStorage::erase(const SyncableDataStorage::Key& key) { + return _storage.erase(key); +} + +std::pair SyncableDataStorage::emplace(SyncableDataStorage::Key key, SyncableDataStorage::Value value) { + return _storage.emplace(key, value); +} + +SyncableDataStorage::Value& SyncableDataStorage::at(const SyncableDataStorage::Key& key) { + return _storage.at(key); +} +const SyncableDataStorage::Value& SyncableDataStorage::at(const SyncableDataStorage::Key& key) const { + return _storage.at(key); +} + +SyncableDataStorage::Iterator SyncableDataStorage::find(const SyncableDataStorage::Key& key) { + return _storage.find(key); +} +/* ================================================== */ + +/* =================== Iterators ==================== */ +SyncableDataStorage::Iterator SyncableDataStorage::end() noexcept { + return _storage.end(); +} + +SyncableDataStorage::Iterator SyncableDataStorage::begin() noexcept { + return _storage.begin(); +} +/* ================================================== */ + +/* =============== Operator overloads =============== */ +SyncableDataStorage::Value& SyncableDataStorage::operator[](SyncableDataStorage::Key&& key) { + return _storage[key]; +} +/* ================================================== */ + +} // namespace openspace diff --git a/modules/softwareintegration/network/common/syncablemessagequeue.h b/modules/softwareintegration/syncabledatastorage.h similarity index 72% rename from modules/softwareintegration/network/common/syncablemessagequeue.h rename to modules/softwareintegration/syncabledatastorage.h index d161890b28..24b02c54ac 100644 --- a/modules/softwareintegration/network/common/syncablemessagequeue.h +++ b/modules/softwareintegration/syncabledatastorage.h @@ -22,21 +22,14 @@ * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * ****************************************************************************************/ -#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEMESSAGEQUEUE___H__ -#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEMESSAGEQUEUE___H__ +#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEDATASTORAGE___H__ +#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEDATASTORAGE___H__ #include -#include +#include namespace openspace { -struct PeerMessage { - size_t peerId; - SoftwareConnection::Message message; - - -}; - /** * A double buffered implementation of the Syncable interface. * Users are encouraged to used this class as a default way to synchronize different @@ -49,8 +42,15 @@ struct PeerMessage { * ((T&) t).method(); * */ -class SyncableMessageQueue : public Syncable { +class SyncableDataStorage : public Syncable { public: + /* ====================== Types ===================== */ + typedef std::string Key; + typedef std::vector Value; // a dataset stored like x1, y1, z1, x2, y2 .... + typedef std::map Storage; + typedef Storage::iterator Iterator; + /* ================================================== */ + /* ============== SyncEngine functions ============== */ // virtual void preSync(bool isMaster) override; virtual void encode(SyncBuffer* syncBuffer) override; @@ -59,27 +59,31 @@ public: /* ================================================== */ /* =============== Utility functions ================ */ - void push(PeerMessage &&item); - void push(const PeerMessage& item); + Iterator erase(Iterator pos); + Iterator erase(const Iterator first, const Iterator last); + size_t erase(const Key& key); - PeerMessage pop(); + std::pair emplace(Key key, Value value); - size_t size() const; + Value& at(const Key& key); + const Value& at(const Key& key) const; - [[nodiscard]] bool empty() const; + Iterator find(const Key& key); + /* ================================================== */ - PeerMessage& front(); - const PeerMessage& front() const; - - PeerMessage& back(); - const PeerMessage& back() const; + /* =================== Iterators ==================== */ + Iterator end() noexcept; + + Iterator begin() noexcept; + /* ================================================== */ + /* =============== Operator overloads =============== */ + Value& operator[](Key&& key); /* ================================================== */ private: std::mutex _mutex; - std::list _queue; - std::vector _messagesToSync; + Storage _storage; bool showMessageEncode = true; bool showMessageDecode = true; @@ -87,4 +91,4 @@ private: } // namespace openspace -#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEMESSAGEQUEUE___H__ +#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SYNCABLEDATASTORAGE___H__