diff --git a/include/openspace/network/parallelconnection.h b/include/openspace/network/parallelconnection.h index 06f21a342a..dc0bc88c1e 100644 --- a/include/openspace/network/parallelconnection.h +++ b/include/openspace/network/parallelconnection.h @@ -105,22 +105,12 @@ namespace openspace{ void clientConnect(); - void disconnect(); - void setPort(const std::string &port); - std::string port(); - void setAddress(const std::string &address); - std::string address(); - void setName(const std::string& name); - std::string name(); - - _SOCKET clientSocket(); - bool isHost(); void requestHostship(); @@ -129,12 +119,10 @@ namespace openspace{ void sendScript(const std::string script); - void queMessage(std::vector message); - - void update(double dt); - void initDone(); + void signalDisconnect(); + enum MessageTypes{ Authentication=0, Initialization, @@ -176,13 +164,17 @@ namespace openspace{ return hashVal; }; + void queMessage(std::vector message); + + void disconnect(); + void writeHeader(std::vector &buffer, uint32_t messageType); void closeSocket(); bool initNetworkAPI(); - void tryConnect(addrinfo *info); + void establishConnection(addrinfo *info); void sendAuthentication(); @@ -208,6 +200,10 @@ namespace openspace{ void sendLoop(); + bool parseHints(addrinfo &info); + + void threadManagement(); + uint32_t _passCode; std::string _port; std::string _address; @@ -216,11 +212,13 @@ namespace openspace{ std::thread *_connectionThread; std::thread *_broadcastThread; std::thread *_sendThread; - std::thread *_receiveThread; + std::thread *_listenThread; + std::thread *_handlerThread; std::atomic _isHost; std::atomic _isConnected; - std::atomic _isListening; std::atomic _performDisconnect; + std::atomic _isRunning; + std::atomic _tryConnect; std::vector> _sendBuffer; std::mutex _sendBufferMutex; }; diff --git a/src/engine/openspaceengine.cpp b/src/engine/openspaceengine.cpp index 3051f573e1..8c101ce271 100644 --- a/src/engine/openspaceengine.cpp +++ b/src/engine/openspaceengine.cpp @@ -638,7 +638,6 @@ void OpenSpaceEngine::preSynchronization() { Time::ref().preSynchronization(); _interactionHandler->update(dt); - _parallelConnection->update(dt); //_interactionHandler.lockControls(); _scriptEngine->preSynchronization(); diff --git a/src/network/parallelconnection.cpp b/src/network/parallelconnection.cpp index 404dda51c6..49d33185ef 100644 --- a/src/network/parallelconnection.cpp +++ b/src/network/parallelconnection.cpp @@ -81,31 +81,132 @@ namespace openspace { _connectionThread(nullptr), _broadcastThread(nullptr), _sendThread(nullptr), - _receiveThread(nullptr), + _listenThread(nullptr), + _handlerThread(nullptr), + _isRunning(true), _isHost(false), _isConnected(false), - _isListening(false), + _tryConnect(false), _performDisconnect(false) { - + //create handler thread + _handlerThread = new (std::nothrow) std::thread(&ParallelConnection::threadManagement, this); } ParallelConnection::~ParallelConnection(){ - disconnect(); -#if defined(__WIN32__) - WSACleanup(); + + //signal that a disconnect should occur + signalDisconnect(); + + //signal that execution has stopped + _isRunning.store(false); + + //join handler + _handlerThread->join(); + + //and delete handler + delete _handlerThread; + } + + void ParallelConnection::threadManagement(){ + //while program is running + while(_isRunning.load()){ + //if we need to disconnect and clean up + if(_performDisconnect.load()){ + disconnect(); + } + } + } + + void ParallelConnection::signalDisconnect(){ + _performDisconnect.store(true); + } + + void ParallelConnection::closeSocket(){ + + /* + Windows shutdown options + * SD_RECIEVE + * SD_SEND + * SD_BOTH + + Linux & Mac shutdown options + * SHUT_RD (Disables further receive operations) + * SHUT_WR (Disables further send operations) + * SHUT_RDWR (Disables further send and receive operations) + */ + +#ifdef __WIN32__ + shutdown(_clientSocket, SD_BOTH); + closesocket(_clientSocket); +#else + shutdown(_clientSocket, SHUT_RDWR); + close(_clientSocket); #endif + + _clientSocket = INVALID_SOCKET; + } + + void ParallelConnection::disconnect(){ + //we're disconnecting + + if (_clientSocket != INVALID_SOCKET){ + + //must be run before trying to join communication threads, else the threads are stuck trying to receive data + closeSocket(); + + //tell connection thread to stop trying to connect + _tryConnect.store(false); + + //tell send thread to stop sending and listen thread to stop listenin + _isConnected.store(false); + + //tell broadcast thread to stop broadcasting (we're no longer host) + _isHost.store(false); + + //join connection thread and delete it + if(_connectionThread != nullptr){ + _connectionThread->join(); + delete _connectionThread; + _connectionThread = nullptr; + } + + //join send thread and delete it + if (_sendThread != nullptr){ + _sendThread->join(); + delete _sendThread; + _sendThread = nullptr; + } + + //join listen thread and delete it + if( _listenThread != nullptr){ + _listenThread->join(); + delete _listenThread; + _listenThread = nullptr; + } + + //join broadcast thread and delete it + if(_broadcastThread != nullptr){ + _broadcastThread->join(); + delete _broadcastThread; + _broadcastThread = nullptr; + } + + // disconnect and cleanup completed + _performDisconnect.store(false); + } } void ParallelConnection::clientConnect(){ - //we're already connected, do nothing (dummy check) - if(_isConnected.load()){ + //we're already connected (or already trying to connect), do nothing (dummy check) + if(_isConnected.load() || _tryConnect.load()){ return; } if (!initNetworkAPI()){ LERROR("Failed to initialize network API for Parallel Connection"); + return; } struct addrinfo *addresult = NULL, *ptr = NULL, hints; @@ -123,32 +224,31 @@ namespace openspace { int result = getaddrinfo(_address.c_str(), _port.c_str(), &hints, &addresult); if (result != 0) { - #if defined(__WIN32__) - //WSACleanup(); - #endif LERROR("Failed to parse hints for Parallel Connection"); return; } - + //we're not connected - _isConnected.store(false); + _isConnected.store(false); + + //we want to try and establish a connection + _tryConnect.store(true); //start connection thread - _connectionThread = new (std::nothrow) std::thread(&ParallelConnection::tryConnect, this, addresult); + _connectionThread = new (std::nothrow) std::thread(&ParallelConnection::establishConnection, this, addresult); } - void ParallelConnection::tryConnect(addrinfo *info){ + void ParallelConnection::establishConnection(addrinfo *info){ _clientSocket = socket(info->ai_family, info->ai_socktype, info->ai_protocol); if (_clientSocket == INVALID_SOCKET){ freeaddrinfo(info); -#if defined(__WIN32__) - //WSACleanup(); -#endif - LERROR("Failed to create client socket, shutting down connection thread"); - return; + LERROR("Failed to create client socket, disconnecting."); + + //signal a disconnect + signalDisconnect(); } int flag = 1; @@ -188,7 +288,7 @@ namespace openspace { //while the connection thread is still running - while (!_isConnected.load()){ + while (_tryConnect.load()){ LINFO("Attempting to connect to server "<< _address << " on port " << _port); @@ -203,23 +303,36 @@ namespace openspace { //we're connected _isConnected.store(true); - - //and ready to start receiving messages - _isListening.store(true); + + //we no longer need to try to establish connection + _tryConnect.store(false); //start listening for communication - _receiveThread = new (std::nothrow) std::thread(&ParallelConnection::listenCommunication, this); + _listenThread = new (std::nothrow) std::thread(&ParallelConnection::listenCommunication, this); //start sending messages _sendThread = new (std::nothrow) std::thread(&ParallelConnection::sendLoop, this); - //and send authentication + //send authentication sendAuthentication(); } - //try to connect once per second +#ifdef __WIN32__ + //on windows: try to connect once per second std::this_thread::sleep_for(std::chrono::seconds(1)); +#else + if(!_isConnected.load()){ + //on unix disconnect and display error message if we're not connected + LERROR("Failed to establish a connection with server "<< _address << " on port " << _port<<", terminating connection."); + + //signal disconnect + signalDisconnect(); + + //stop loop + break; + } +#endif } //cleanup @@ -465,10 +578,7 @@ namespace openspace { LERROR("Failed to send message.\nError: " << _ERRNO << " detected in connection, disconnecting."); //stop all threads and signal that a disconnect should be performed - _performDisconnect.store(true); - _isConnected.store(false); - _isHost.store(false); - _isListening.store(false); + signalDisconnect(); } } @@ -539,12 +649,8 @@ namespace openspace { } } else{ - LERROR("Error " << _ERRNO << " detected in connection, disconnecting."); - //stop all threads and signal that a disconnect should be performed - _performDisconnect.store(true); - _isConnected.store(false); - _isHost.store(false); - _isListening.store(false); + LERROR("Error " << _ERRNO << " detected in connection, disconnecting."); + signalDisconnect(); } } @@ -627,8 +733,9 @@ namespace openspace { buffer.resize(headerSize()); int result; - //while we're still connected and listening - while (_isListening.load()){ + + //while we're still connected + while (_isConnected.load()){ //receive the first parts of a message result = receiveData(_clientSocket, buffer, headerSize(), 0); @@ -658,22 +765,13 @@ namespace openspace { LERROR("Error " << _ERRNO << " detected in connection, disconnecting!"); } - //stop all threads and signal that a disconnect should be performed - _performDisconnect.store(true); - _isConnected.store(false); - _isHost.store(false); - _isListening.store(false); + //signal that a disconnect should be performed + signalDisconnect(); break; } } } - - void ParallelConnection::update(double dt){ - if(_performDisconnect.load()){ - disconnect(); - } - } int ParallelConnection::receiveData(_SOCKET & socket, std::vector &buffer, int length, int flags){ int result = 0; @@ -698,31 +796,15 @@ namespace openspace { void ParallelConnection::setPort(const std::string &port){ _port = port; } - - std::string ParallelConnection::port(){ - return _port; - } - + void ParallelConnection::setAddress(const std::string &address){ _address = address; } - std::string ParallelConnection::address(){ - return _address; - } - void ParallelConnection::setName(const std::string& name){ _name = name; } - std::string ParallelConnection::name(){ - return _name; - } - - _SOCKET ParallelConnection::clientSocket(){ - return _clientSocket; - } - bool ParallelConnection::isHost(){ return _isHost.load(); } @@ -760,89 +842,6 @@ namespace openspace { queMessage(buffer); } - - void ParallelConnection::disconnect(){ - //we're disconnecting - _performDisconnect.store(false); - - if (_clientSocket != INVALID_SOCKET){ - - //must be run before trying to join communication threads, else the threads are stuck trying to receive data - closeSocket(); - - //tell broadcast thread to stop broadcasting - _isHost.store(false); - - //tell connection thread to stop listening - _isListening.store(false); - - //join receive thread and delete it - if (_receiveThread != nullptr){ - _receiveThread->join(); - delete _receiveThread; - _receiveThread = nullptr; - } - - //join broadcast thread and delete it - if (_broadcastThread != nullptr){ - _broadcastThread->join(); - delete _broadcastThread; - _broadcastThread = nullptr; - } - - //join send thread and delete it - if (_sendThread != nullptr){ - _sendThread->join(); - delete _sendThread; - _sendThread = nullptr; - } - - //tell connection thread that we are connected (to be able to join and delete thread) - _isConnected.store(true); - - //join connection thread and delete it - if (_connectionThread != nullptr){ - _connectionThread->join(); - delete _connectionThread; - _connectionThread = nullptr; - } - - //make sure to set us as NOT connected again, connection thread is now deleted - _isConnected.store(false); - -#if defined(__WIN32__) - //this line causes issues with SGCT since winsock dll file is unloaded upon call - //@TODO should this be here? -// WSACleanup(); -#endif - } - } - - void ParallelConnection::closeSocket(){ - - /* - Windows shutdown options - * SD_RECIEVE - * SD_SEND - * SD_BOTH - - Linux & Mac shutdown options - * SHUT_RD (Disables further receive operations) - * SHUT_WR (Disables further send operations) - * SHUT_RDWR (Disables further send and receive operations) - */ - -#ifdef __WIN32__ - shutdown(_clientSocket, SD_BOTH); - closesocket(_clientSocket); -#else - shutdown(_clientSocket, SHUT_RDWR); - close(_clientSocket); -#endif - - _clientSocket = INVALID_SOCKET; - } - bool ParallelConnection::initNetworkAPI(){ #if defined(__WIN32__) WSADATA wsaData; @@ -871,8 +870,8 @@ namespace openspace { void ParallelConnection::broadcast(){ - //while we're still the host - while (_isHost.load()){ + //while we're still connected and we're the host + while (_isConnected.load() && _isHost.load()){ //create a keyframe with current position and orientation of camera network::StreamDataKeyframe kf; diff --git a/src/network/parallelconnection_lua.inl b/src/network/parallelconnection_lua.inl index 0e135d9239..edcc4c1868 100644 --- a/src/network/parallelconnection_lua.inl +++ b/src/network/parallelconnection_lua.inl @@ -32,9 +32,7 @@ namespace luascriptfunctions { * Set the port for parallel connection */ int setPort(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + const bool isFunction = (lua_isfunction(L, -1) != 0); if (isFunction) { // If the top of the stack is a function, it is ourself @@ -46,7 +44,9 @@ int setPort(lua_State* L) { if (isNumber) { int value = lua_tonumber(L, -1); std::string port = std::to_string(value); - OsEng.parallelConnection()->setPort(port); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->setPort(port); + } return 0; } else { @@ -59,9 +59,7 @@ int setPort(lua_State* L) { } int setAddress(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + const bool isFunction = (lua_isfunction(L, -1) != 0); if (isFunction) { // If the top of the stack is a function, it is ourself @@ -72,7 +70,9 @@ int setAddress(lua_State* L) { const int type = lua_type(L, -1); if (type == LUA_TSTRING) { std::string address = luaL_checkstring(L, -1); - OsEng.parallelConnection()->setAddress(address); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->setAddress(address); + } return 0; } else { @@ -85,9 +85,7 @@ int setAddress(lua_State* L) { } int setPassword(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + const bool isFunction = (lua_isfunction(L, -1) != 0); if (isFunction) { // If the top of the stack is a function, it is ourself @@ -98,7 +96,9 @@ int setPassword(lua_State* L) { const int type = lua_type(L, -1); if (type == LUA_TSTRING) { std::string pwd = luaL_checkstring(L, -1); - OsEng.parallelConnection()->setPassword(pwd); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->setPassword(pwd); + } return 0; } else { @@ -111,9 +111,7 @@ int setPassword(lua_State* L) { } int setDisplayName(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + const bool isFunction = (lua_isfunction(L, -1) != 0); if (isFunction) { // If the top of the stack is a function, it is ourself @@ -124,7 +122,9 @@ int setDisplayName(lua_State* L) { const int type = lua_type(L, -1); if (type == LUA_TSTRING) { std::string name = luaL_checkstring(L, -1); - OsEng.parallelConnection()->setName(name); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->setName(name); + } return 0; } else { @@ -137,46 +137,46 @@ int setDisplayName(lua_State* L) { } int connect(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + int nArguments = lua_gettop(L); if (nArguments != 0) return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments); - OsEng.parallelConnection()->clientConnect(); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->clientConnect(); + } return 0; } int disconnect(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + int nArguments = lua_gettop(L); if (nArguments != 0) return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments); - OsEng.parallelConnection()->disconnect(); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->signalDisconnect(); + } return 0; } int requestHostship(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + int nArguments = lua_gettop(L); if (nArguments != 0) return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments); - OsEng.parallelConnection()->requestHostship(); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->requestHostship(); + } return 0; } int initialized(lua_State* L) { - if(!OsEng.isMaster()){ - return; - } + int nArguments = lua_gettop(L); if (nArguments != 0) return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments); - OsEng.parallelConnection()->initDone(); + if(OsEng.isMaster()){ + OsEng.parallelConnection()->initDone(); + } return 0; }