diff --git a/include/openspace/network/parallelconnection.h b/include/openspace/network/parallelconnection.h index d5c8f0f05a..8aeb93d01b 100644 --- a/include/openspace/network/parallelconnection.h +++ b/include/openspace/network/parallelconnection.h @@ -129,6 +129,8 @@ namespace openspace{ void sendScript(const std::string script); + void queMessage(std::vector message); + enum MessageTypes{ Authentication=0, Initialization, @@ -199,6 +201,8 @@ namespace openspace{ int receiveData(_SOCKET & socket, std::vector &buffer, int length, int flags); + void sendLoop(); + uint32_t _passCode; std::string _port; std::string _address; @@ -206,11 +210,12 @@ namespace openspace{ _SOCKET _clientSocket; std::thread *_connectionThread; std::thread *_broadcastThread; + std::thread *_sendThread; std::atomic _isHost; std::atomic _isConnected; std::atomic _isListening; - std::vector _executedScripts; - std::mutex _executedScriptsMutex; + std::vector> _sendBuffer; + std::mutex _sendBufferMutex; }; } // namespace network diff --git a/src/network/parallelconnection.cpp b/src/network/parallelconnection.cpp index d0f266d241..b7407af03f 100644 --- a/src/network/parallelconnection.cpp +++ b/src/network/parallelconnection.cpp @@ -80,6 +80,7 @@ namespace openspace { _clientSocket(INVALID_SOCKET), _connectionThread(nullptr), _broadcastThread(nullptr), + _sendThread(nullptr), _isHost(false), _isConnected(false), _isListening(false) @@ -201,6 +202,7 @@ namespace openspace { //start listening for communication listenCommunication(); + } //try to connect once per second @@ -331,8 +333,8 @@ namespace openspace { std::string script; script.assign(scriptbuffer.begin(), scriptbuffer.end()); - //and add that script to all received init scripts - initScripts.push_back(script); + //que script with the script engine + OsEng.scriptEngine()->queueScript(script); } @@ -414,11 +416,24 @@ namespace openspace { //tell the script engine to execute the script when appropriate OsEng.scriptEngine()->queueScript(script); - - //add script to all executed scripts - _executedScriptsMutex.lock(); - _executedScripts.push_back(script); - _executedScriptsMutex.unlock(); + } + + void ParallelConnection::queMessage(std::vector message){ + _sendBufferMutex.lock(); + _sendBuffer.push_back(message); + _sendBufferMutex.unlock(); + } + + void ParallelConnection::sendLoop(){ + while(_isHost.load()){ + _sendBufferMutex.lock(); + if(_sendBuffer.size() > 0){ + //send first queued message + send(_clientSocket, _sendBuffer.front().data(), _sendBuffer.front().size(), 0); + _sendBuffer.erase(_sendBuffer.begin()); + } + _sendBufferMutex.unlock(); + } } void ParallelConnection::decodeHostInfoMessage(){ @@ -445,6 +460,9 @@ namespace openspace { //start broadcasting _isHost.store(true); _broadcastThread = new (std::nothrow) std::thread(&ParallelConnection::broadcast, this); + + //and sending messages + _sendThread = new (std::nothrow) std::thread(&ParallelConnection::sendLoop, this); } } else{ //we've been assigned as client @@ -462,6 +480,18 @@ namespace openspace { _broadcastThread = nullptr; } + //delete send thread + if(_sendThread != nullptr){ + _sendThread->join(); + delete _sendThread; + _sendThread = nullptr; + + //and clear all queued messages + _sendBufferMutex.lock(); + _sendBuffer.clear(); + _sendBufferMutex.unlock(); + } + } else{ //we were not broadcasting so nothing to do @@ -637,10 +667,6 @@ namespace openspace { } void ParallelConnection::sendScript(const std::string script){ - _executedScriptsMutex.lock(); - _executedScripts.push_back(script); - _executedScriptsMutex.unlock(); - uint16_t msglen = static_cast(script.length()); std::vector buffer; buffer.reserve(headerSize() + sizeof(msglen) + msglen); @@ -685,6 +711,13 @@ namespace openspace { _broadcastThread = nullptr; } + //join send thread and delete it + if (_sendThread != nullptr){ + _sendThread->join(); + delete _sendThread; + _sendThread = nullptr; + } + #if defined(__WIN32__) WSACleanup(); #endif