From b723751c1bccc2c2eedc5bf4ad41eebafbdc73e7 Mon Sep 17 00:00:00 2001 From: Joakim Kilby Date: Thu, 18 Jun 2015 09:10:15 +0200 Subject: [PATCH] added functionality to disconnect and request hostship via LUA. fixed bug in threading --- .../openspace/network/osparallelconnection.h | 7 +- src/network/osparallelconnection.cpp | 179 ++++++++++-------- src/network/osparallelconnection_lua.inl | 15 ++ 3 files changed, 123 insertions(+), 78 deletions(-) diff --git a/include/openspace/network/osparallelconnection.h b/include/openspace/network/osparallelconnection.h index 8a84120fbc..d17c5bd80d 100644 --- a/include/openspace/network/osparallelconnection.h +++ b/include/openspace/network/osparallelconnection.h @@ -105,6 +105,8 @@ namespace openspace{ void clientConnect(); + void disconnect(); + void setPort(const std::string &port); std::string port(); @@ -136,7 +138,8 @@ namespace openspace{ Initialization, Data, HostInfo, - InitializationRequest + InitializationRequest, + HostshipRequest }; /** @@ -169,8 +172,6 @@ namespace openspace{ return hashVal; }; - void disconnect(); - void closeSocket(); bool initNetworkAPI(); diff --git a/src/network/osparallelconnection.cpp b/src/network/osparallelconnection.cpp index 3db55c5456..58d3e46112 100644 --- a/src/network/osparallelconnection.cpp +++ b/src/network/osparallelconnection.cpp @@ -86,13 +86,7 @@ namespace openspace { } OSParallelConnection::~OSParallelConnection(){ - _isRunning.store(false); - - disconnect(); - - #if defined(__WIN32__) - WSACleanup(); - #endif + disconnect(); } void OSParallelConnection::clientConnect(){ @@ -136,55 +130,56 @@ namespace openspace { } void OSParallelConnection::connection(addrinfo *info){ - int result; + _clientSocket = socket(info->ai_family, info->ai_socktype, info->ai_protocol); + + if (_clientSocket == INVALID_SOCKET){ + freeaddrinfo(info); +#if defined(__WIN32__) + WSACleanup(); +#endif + std::cerr << "Failed to InitializationRequest client socket!" << std::endl; + return; + } + + int flag = 1; + int result; + + //set no delay + result = setsockopt(_clientSocket, /* socket affected */ + IPPROTO_TCP, /* set option at TCP level */ + TCP_NODELAY, /* name of option */ + (char *)&flag, /* the cast is historical cruft */ + sizeof(int)); /* length of option value */ + + //set send timeout + int timeout = 0; //infinite + result = setsockopt( + _clientSocket, + SOL_SOCKET, + SO_SNDTIMEO, + (char *)&timeout, + sizeof(timeout)); + + //set receive timeout + result = setsockopt( + _clientSocket, + SOL_SOCKET, + SO_RCVTIMEO, + (char *)&timeout, + sizeof(timeout)); + + result = setsockopt(_clientSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, sizeof(int)); + if (result == SOCKET_ERROR) + std::cout << "Failed to set reuse address with error:" << _ERRNO << std::endl; + + result = setsockopt(_clientSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&flag, sizeof(int)); + if (result == SOCKET_ERROR) + std::cout << "Failed to set keep alive with error: " << _ERRNO << std::endl; + + //try to connect to server while (_isRunning.load()){ - _clientSocket = socket(info->ai_family, info->ai_socktype, info->ai_protocol); - - if (_clientSocket == INVALID_SOCKET){ - freeaddrinfo(info); - #if defined(__WIN32__) - WSACleanup(); - #endif - std::cerr << "Failed to init client socket!" << std::endl; - return; - } - - int flag = 1; - int result; - - //set no delay - result = setsockopt(_clientSocket, /* socket affected */ - IPPROTO_TCP, /* set option at TCP level */ - TCP_NODELAY, /* name of option */ - (char *)&flag, /* the cast is historical cruft */ - sizeof(int)); /* length of option value */ - - //set send timeout - int timeout = 0; //infinite - result = setsockopt( - _clientSocket, - SOL_SOCKET, - SO_SNDTIMEO, - (char *)&timeout, - sizeof(timeout)); - - //set receive timeout - result = setsockopt( - _clientSocket, - SOL_SOCKET, - SO_RCVTIMEO, - (char *)&timeout, - sizeof(timeout)); - - result = setsockopt(_clientSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, sizeof(int)); - if (result == SOCKET_ERROR) - std::cout << "Failed to set reuse address with error:" << _ERRNO << std::endl; - - result = setsockopt(_clientSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&flag, sizeof(int)); - if (result == SOCKET_ERROR) - std::cout << "Failed to set keep alive with error: " << _ERRNO << std::endl; - + result = connect(_clientSocket, info->ai_addr, (int)info->ai_addrlen); if (result != SOCKET_ERROR) { @@ -198,7 +193,12 @@ namespace openspace { //one sec sleep std::this_thread::sleep_for(std::chrono::seconds(1)); } - + //make sure to join the broadcast thread if started + //dont delete it, that will be done in disconnect() function + if(_broadcastThread != nullptr && _isHost.load()){ + _isHost.store(false); + _broadcastThread->join(); + } //cleanup freeaddrinfo(info); } @@ -269,7 +269,6 @@ namespace openspace { } void OSParallelConnection::decodeDataMessage(){ - int result; uint16_t msglen; std::vector buffer; @@ -312,7 +311,7 @@ namespace openspace { //start broadcasting _isHost.store(true); _broadcastThread = new (std::nothrow) std::thread(&OSParallelConnection::broadcast, this); - } + } } else{ //we were broadcasting but should stop now @@ -439,7 +438,20 @@ namespace openspace { } void OSParallelConnection::requestHostship(){ + std::vector buffer; + buffer.reserve(headerSize + sizeof(int)); + //header + buffer.insert(buffer.end(), 'O'); + buffer.insert(buffer.end(), 'S'); + buffer.insert(buffer.end(), 0); + buffer.insert(buffer.end(), 0); + //type of message + int type = OSParallelConnection::MessageTypes::HostshipRequest; + buffer.insert(buffer.end(), reinterpret_cast(&type), reinterpret_cast(&type) + sizeof(type)); + + //send message + send(_clientSocket, buffer.data(), buffer.size(), 0); } void OSParallelConnection::setPassword(const std::string& pwd){ @@ -447,23 +459,28 @@ namespace openspace { } void OSParallelConnection::disconnect(){ - _isHost.store(false); + //must be run before trying to join communication threads, else the threads are stuck trying to receive data + closeSocket(); + + _isRunning.store(false); + _isHost.store(false); + if (_connectionThread != nullptr){ + _connectionThread->join(); + delete _connectionThread; + _connectionThread = nullptr; + } + + if (_broadcastThread != nullptr){ + _broadcastThread->join(); + delete _broadcastThread; + _broadcastThread = nullptr; + } + - if (_broadcastThread != nullptr){ - _broadcastThread->join(); - delete _broadcastThread; - _broadcastThread = nullptr; - } - - _isRunning.store(false); - - if (_connectionThread != nullptr){ - _connectionThread->join(); - delete _connectionThread; - _connectionThread = nullptr; - } - - closeSocket(); + +#if defined(__WIN32__) + WSACleanup(); +#endif } void OSParallelConnection::closeSocket(){ @@ -556,8 +573,8 @@ namespace openspace { //send message send(_clientSocket, buffer.data(), buffer.size(), 0); - //100 ms sleep - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + //200 ms sleep + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } @@ -595,6 +612,18 @@ namespace openspace { "", "Connect to parallel" }, + { + "disconnect", + &luascriptfunctions::disconnect, + "", + "Disconnect from parallel" + }, + { + "requestHostship", + &luascriptfunctions::requestHostship, + "", + "Request to be the host for this session" + }, } }; } diff --git a/src/network/osparallelconnection_lua.inl b/src/network/osparallelconnection_lua.inl index a247ffa536..3237675c1a 100644 --- a/src/network/osparallelconnection_lua.inl +++ b/src/network/osparallelconnection_lua.inl @@ -132,6 +132,21 @@ int connect(lua_State* L) { return 0; } +int disconnect(lua_State* L) { + int nArguments = lua_gettop(L); + if (nArguments != 0) + return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments); + OsEng.parallelConnection()->disconnect(); + return 0; +} + +int requestHostship(lua_State* L) { + int nArguments = lua_gettop(L); + if (nArguments != 0) + return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments); + OsEng.parallelConnection()->requestHostship(); + return 0; +} } // namespace luascriptfunctions