From 8c2b5bb1999900d19ba059d2ff398ff1cdaab940 Mon Sep 17 00:00:00 2001 From: Joakim Kilby Date: Fri, 3 Jul 2015 14:43:48 +0200 Subject: [PATCH] Tons of stability improvements to threading of parallel connection. changed name of variables and functions to reflect more what they do. removed obsolete functions. parallel connection is no longer in contact with main thread unless to be deleted --- .../openspace/network/parallelconnection.h | 32 +- src/engine/openspaceengine.cpp | 1 - src/network/parallelconnection.cpp | 303 +++++++++--------- src/network/parallelconnection_lua.inl | 64 ++-- 4 files changed, 198 insertions(+), 202 deletions(-) diff --git a/include/openspace/network/parallelconnection.h b/include/openspace/network/parallelconnection.h index 06f21a342a..dc0bc88c1e 100644 --- a/include/openspace/network/parallelconnection.h +++ b/include/openspace/network/parallelconnection.h @@ -105,22 +105,12 @@ namespace openspace{ void clientConnect(); - void disconnect(); - void setPort(const std::string &port); - std::string port(); - void setAddress(const std::string &address); - std::string address(); - void setName(const std::string& name); - std::string name(); - - _SOCKET clientSocket(); - bool isHost(); void requestHostship(); @@ -129,12 +119,10 @@ namespace openspace{ void sendScript(const std::string script); - void queMessage(std::vector message); - - void update(double dt); - void initDone(); + void signalDisconnect(); + enum MessageTypes{ Authentication=0, Initialization, @@ -176,13 +164,17 @@ namespace openspace{ return hashVal; }; + void queMessage(std::vector message); + + void disconnect(); + void writeHeader(std::vector &buffer, uint32_t messageType); void closeSocket(); bool initNetworkAPI(); - void tryConnect(addrinfo *info); + void establishConnection(addrinfo *info); void sendAuthentication(); @@ -208,6 +200,10 @@ namespace openspace{ void sendLoop(); + bool parseHints(addrinfo &info); + + void threadManagement(); + uint32_t _passCode; std::string _port; std::string _address; @@ -216,11 +212,13 @@ namespace openspace{ std::thread *_connectionThread; std::thread *_broadcastThread; std::thread *_sendThread; - std::thread *_receiveThread; + std::thread *_listenThread; + std::thread *_handlerThread; std::atomic _isHost; std::atomic _isConnected; - std::atomic _isListening; std::atomic _performDisconnect; + std::atomic _isRunning; + std::atomic _tryConnect; std::vector> _sendBuffer; std::mutex _sendBufferMutex; }; diff --git a/src/engine/openspaceengine.cpp b/src/engine/openspaceengine.cpp index 3051f573e1..8c101ce271 100644 --- a/src/engine/openspaceengine.cpp +++ b/src/engine/openspaceengine.cpp @@ -638,7 +638,6 @@ void OpenSpaceEngine::preSynchronization() { Time::ref().preSynchronization(); _interactionHandler->update(dt); - _parallelConnection->update(dt); //_interactionHandler.lockControls(); _scriptEngine->preSynchronization(); diff --git a/src/network/parallelconnection.cpp b/src/network/parallelconnection.cpp index 404dda51c6..49d33185ef 100644 --- a/src/network/parallelconnection.cpp +++ b/src/network/parallelconnection.cpp @@ -81,31 +81,132 @@ namespace openspace { _connectionThread(nullptr), _broadcastThread(nullptr), _sendThread(nullptr), - _receiveThread(nullptr), + _listenThread(nullptr), + _handlerThread(nullptr), + _isRunning(true), _isHost(false), _isConnected(false), - _isListening(false), + _tryConnect(false), _performDisconnect(false) { - + //create handler thread + _handlerThread = new (std::nothrow) std::thread(&ParallelConnection::threadManagement, this); } ParallelConnection::~ParallelConnection(){ - disconnect(); -#if defined(__WIN32__) - WSACleanup(); + + //signal that a disconnect should occur + signalDisconnect(); + + //signal that execution has stopped + _isRunning.store(false); + + //join handler + _handlerThread->join(); + + //and delete handler + delete _handlerThread; + } + + void ParallelConnection::threadManagement(){ + //while program is running + while(_isRunning.load()){ + //if we need to disconnect and clean up + if(_performDisconnect.load()){ + disconnect(); + } + } + } + + void ParallelConnection::signalDisconnect(){ + _performDisconnect.store(true); + } + + void ParallelConnection::closeSocket(){ + + /* + Windows shutdown options + * SD_RECIEVE + * SD_SEND + * SD_BOTH + + Linux & Mac shutdown options + * SHUT_RD (Disables further receive operations) + * SHUT_WR (Disables further send operations) + * SHUT_RDWR (Disables further send and receive operations) + */ + +#ifdef __WIN32__ + shutdown(_clientSocket, SD_BOTH); + closesocket(_clientSocket); +#else + shutdown(_clientSocket, SHUT_RDWR); + close(_clientSocket); #endif + + _clientSocket = INVALID_SOCKET; + } + + void ParallelConnection::disconnect(){ + //we're disconnecting + + if (_clientSocket != INVALID_SOCKET){ + + //must be run before trying to join communication threads, else the threads are stuck trying to receive data + closeSocket(); + + //tell connection thread to stop trying to connect + _tryConnect.store(false); + + //tell send thread to stop sending and listen thread to stop listenin + _isConnected.store(false); + + //tell broadcast thread to stop broadcasting (we're no longer host) + _isHost.store(false); + + //join connection thread and delete it + if(_connectionThread != nullptr){ + _connectionThread->join(); + delete _connectionThread; + _connectionThread = nullptr; + } + + //join send thread and delete it + if (_sendThread != nullptr){ + _sendThread->join(); + delete _sendThread; + _sendThread = nullptr; + } + + //join listen thread and delete it + if( _listenThread != nullptr){ + _listenThread->join(); + delete _listenThread; + _listenThread = nullptr; + } + + //join broadcast thread and delete it + if(_broadcastThread != nullptr){ + _broadcastThread->join(); + delete _broadcastThread; + _broadcastThread = nullptr; + } + + // disconnect and cleanup completed + _performDisconnect.store(false); + } } void ParallelConnection::clientConnect(){ - //we're already connected, do nothing (dummy check) - if(_isConnected.load()){ + //we're already connected (or already trying to connect), do nothing (dummy check) + if(_isConnected.load() || _tryConnect.load()){ return; } if (!initNetworkAPI()){ LERROR("Failed to initialize network API for Parallel Connection"); + return; } struct addrinfo *addresult = NULL, *ptr = NULL, hints; @@ -123,32 +224,31 @@ namespace openspace { int result = getaddrinfo(_address.c_str(), _port.c_str(), &hints, &addresult); if (result != 0) { - #if defined(__WIN32__) - //WSACleanup(); - #endif LERROR("Failed to parse hints for Parallel Connection"); return; } - + //we're not connected - _isConnected.store(false); + _isConnected.store(false); + + //we want to try and establish a connection + _tryConnect.store(true); //start connection thread - _connectionThread = new (std::nothrow) std::thread(&ParallelConnection::tryConnect, this, addresult); + _connectionThread = new (std::nothrow) std::thread(&ParallelConnection::establishConnection, this, addresult); } - void ParallelConnection::tryConnect(addrinfo *info){ + void ParallelConnection::establishConnection(addrinfo *info){ _clientSocket = socket(info->ai_family, info->ai_socktype, info->ai_protocol); if (_clientSocket == INVALID_SOCKET){ freeaddrinfo(info); -#if defined(__WIN32__) - //WSACleanup(); -#endif - LERROR("Failed to create client socket, shutting down connection thread"); - return; + LERROR("Failed to create client socket, disconnecting."); + + //signal a disconnect + signalDisconnect(); } int flag = 1; @@ -188,7 +288,7 @@ namespace openspace { //while the connection thread is still running - while (!_isConnected.load()){ + while (_tryConnect.load()){ LINFO("Attempting to connect to server "<< _address << " on port " << _port); @@ -203,23 +303,36 @@ namespace openspace { //we're connected _isConnected.store(true); - - //and ready to start receiving messages - _isListening.store(true); + + //we no longer need to try to establish connection + _tryConnect.store(false); //start listening for communication - _receiveThread = new (std::nothrow) std::thread(&ParallelConnection::listenCommunication, this); + _listenThread = new (std::nothrow) std::thread(&ParallelConnection::listenCommunication, this); //start sending messages _sendThread = new (std::nothrow) std::thread(&ParallelConnection::sendLoop, this); - //and send authentication + //send authentication sendAuthentication(); } - //try to connect once per second +#ifdef __WIN32__ + //on windows: try to connect once per second std::this_thread::sleep_for(std::chrono::seconds(1)); +#else + if(!_isConnected.load()){ + //on unix disconnect and display error message if we're not connected + LERROR("Failed to establish a connection with server "<< _address << " on port " << _port<<", terminating connection."); + + //signal disconnect + signalDisconnect(); + + //stop loop + break; + } +#endif } //cleanup @@ -465,10 +578,7 @@ namespace openspace { LERROR("Failed to send message.\nError: " << _ERRNO << " detected in connection, disconnecting."); //stop all threads and signal that a disconnect should be performed - _performDisconnect.store(true); - _isConnected.store(false); - _isHost.store(false); - _isListening.store(false); + signalDisconnect(); } } @@ -539,12 +649,8 @@ namespace openspace { } } else{ - LERROR("Error " << _ERRNO << " detected in connection, disconnecting."); - //stop all threads and signal that a disconnect should be performed - _performDisconnect.store(true); - _isConnected.store(false); - _isHost.store(false); - _isListening.store(false); + LERROR("Error " << _ERRNO << " detected in connection, disconnecting."); + signalDisconnect(); } } @@ -627,8 +733,9 @@ namespace openspace { buffer.resize(headerSize()); int result; - //while we're still connected and listening - while (_isListening.load()){ + + //while we're still connected + while (_isConnected.load()){ //receive the first parts of a message result = receiveData(_clientSocket, buffer, headerSize(), 0); @@ -658,22 +765,13 @@ namespace openspace { LERROR("Error " << _ERRNO << " detected in connection, disconnecting!"); } - //stop all threads and signal that a disconnect should be performed - _performDisconnect.store(true); - _isConnected.store(false); - _isHost.store(false); - _isListening.store(false); + //signal that a disconnect should be performed + signalDisconnect(); break; } } } - - void ParallelConnection::update(double dt){ - if(_performDisconnect.load()){ - disconnect(); - } - } int ParallelConnection::receiveData(_SOCKET & socket, std::vector &buffer, int length, int flags){ int result = 0; @@ -698,31 +796,15 @@ namespace openspace { void ParallelConnection::setPort(const std::string &port){ _port = port; } - - std::string ParallelConnection::port(){ - return _port; - } - + void ParallelConnection::setAddress(const std::string &address){ _address = address; } - std::string ParallelConnection::address(){ - return _address; - } - void ParallelConnection::setName(const std::string& name){ _name = name; } - std::string ParallelConnection::name(){ - return _name; - } - - _SOCKET ParallelConnection::clientSocket(){ - return _clientSocket; - } - bool ParallelConnection::isHost(){ return _isHost.load(); } @@ -760,89 +842,6 @@ namespace openspace { queMessage(buffer); } - - void ParallelConnection::disconnect(){ - //we're disconnecting - _performDisconnect.store(false); - - if (_clientSocket != INVALID_SOCKET){ - - //must be run before trying to join communication threads, else the threads are stuck trying to receive data - closeSocket(); - - //tell broadcast thread to stop broadcasting - _isHost.store(false); - - //tell connection thread to stop listening - _isListening.store(false); - - //join receive thread and delete it - if (_receiveThread != nullptr){ - _receiveThread->join(); - delete _receiveThread; - _receiveThread = nullptr; - } - - //join broadcast thread and delete it - if (_broadcastThread != nullptr){ - _broadcastThread->join(); - delete _broadcastThread; - _broadcastThread = nullptr; - } - - //join send thread and delete it - if (_sendThread != nullptr){ - _sendThread->join(); - delete _sendThread; - _sendThread = nullptr; - } - - //tell connection thread that we are connected (to be able to join and delete thread) - _isConnected.store(true); - - //join connection thread and delete it - if (_connectionThread != nullptr){ - _connectionThread->join(); - delete _connectionThread; - _connectionThread = nullptr; - } - - //make sure to set us as NOT connected again, connection thread is now deleted - _isConnected.store(false); - -#if defined(__WIN32__) - //this line causes issues with SGCT since winsock dll file is unloaded upon call - //@TODO should this be here? -// WSACleanup(); -#endif - } - } - - void ParallelConnection::closeSocket(){ - - /* - Windows shutdown options - * SD_RECIEVE - * SD_SEND - * SD_BOTH - - Linux & Mac shutdown options - * SHUT_RD (Disables further receive operations) - * SHUT_WR (Disables further send operations) - * SHUT_RDWR (Disables further send and receive operations) - */ - -#ifdef __WIN32__ - shutdown(_clientSocket, SD_BOTH); - closesocket(_clientSocket); -#else - shutdown(_clientSocket, SHUT_RDWR); - close(_clientSocket); -#endif - - _clientSocket = INVALID_SOCKET; - } - bool ParallelConnection::initNetworkAPI(){ #if defined(__WIN32__) WSADATA wsaData; @@ -871,8 +870,8 @@ namespace openspace { void ParallelConnection::broadcast(){ - //while we're still the host - while (_isHost.load()){ + //while we're still connected and we're the host + while (_isConnected.load() && _isHost.load()){ //create a keyframe with current position and orientation of camera network::StreamDataKeyframe kf; diff --git a/src/network/parallelconnection_lua.inl b/src/network/parallelconnection_lua.inl index 0e135d9239..edcc4c1868 100644 --- a/src/network/parallelconnection_lua.inl +++ b/src/network/parallelconnection_lua.inl @@ -32,9 +32,7 @@ namespace luascriptfunctions { * Set the port for parallel connection */ int setPort(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + const bool isFunction = (lua_isfunction(L, -1) != 0); if (isFunction) { // If the top of the stack is a function, it is ourself @@ -46,7 +44,9 @@ int setPort(lua_State* L) { if (isNumber) { int value = lua_tonumber(L, -1); std::string port = std::to_string(value); - OsEng.parallelConnection()->setPort(port); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->setPort(port); + } return 0; } else { @@ -59,9 +59,7 @@ int setPort(lua_State* L) { } int setAddress(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + const bool isFunction = (lua_isfunction(L, -1) != 0); if (isFunction) { // If the top of the stack is a function, it is ourself @@ -72,7 +70,9 @@ int setAddress(lua_State* L) { const int type = lua_type(L, -1); if (type == LUA_TSTRING) { std::string address = luaL_checkstring(L, -1); - OsEng.parallelConnection()->setAddress(address); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->setAddress(address); + } return 0; } else { @@ -85,9 +85,7 @@ int setAddress(lua_State* L) { } int setPassword(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + const bool isFunction = (lua_isfunction(L, -1) != 0); if (isFunction) { // If the top of the stack is a function, it is ourself @@ -98,7 +96,9 @@ int setPassword(lua_State* L) { const int type = lua_type(L, -1); if (type == LUA_TSTRING) { std::string pwd = luaL_checkstring(L, -1); - OsEng.parallelConnection()->setPassword(pwd); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->setPassword(pwd); + } return 0; } else { @@ -111,9 +111,7 @@ int setPassword(lua_State* L) { } int setDisplayName(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + const bool isFunction = (lua_isfunction(L, -1) != 0); if (isFunction) { // If the top of the stack is a function, it is ourself @@ -124,7 +122,9 @@ int setDisplayName(lua_State* L) { const int type = lua_type(L, -1); if (type == LUA_TSTRING) { std::string name = luaL_checkstring(L, -1); - OsEng.parallelConnection()->setName(name); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->setName(name); + } return 0; } else { @@ -137,46 +137,46 @@ int setDisplayName(lua_State* L) { } int connect(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + int nArguments = lua_gettop(L); if (nArguments != 0) return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments); - OsEng.parallelConnection()->clientConnect(); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->clientConnect(); + } return 0; } int disconnect(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + int nArguments = lua_gettop(L); if (nArguments != 0) return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments); - OsEng.parallelConnection()->disconnect(); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->signalDisconnect(); + } return 0; } int requestHostship(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + int nArguments = lua_gettop(L); if (nArguments != 0) return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments); - OsEng.parallelConnection()->requestHostship(); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->requestHostship(); + } return 0; } int initialized(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + int nArguments = lua_gettop(L); if (nArguments != 0) return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments); - OsEng.parallelConnection()->initDone(); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->initDone(); + } return 0; }