added functionality to disconnect and request hostship via LUA.

fixed bug in threading
This commit is contained in:
Joakim Kilby
2015-06-18 09:10:15 +02:00
parent be21b8e43d
commit b723751c1b
3 changed files with 123 additions and 78 deletions
@@ -105,6 +105,8 @@ namespace openspace{
void clientConnect();
void disconnect();
void setPort(const std::string &port);
std::string port();
@@ -136,7 +138,8 @@ namespace openspace{
Initialization,
Data,
HostInfo,
InitializationRequest
InitializationRequest,
HostshipRequest
};
/**
@@ -169,8 +172,6 @@ namespace openspace{
return hashVal;
};
void disconnect();
void closeSocket();
bool initNetworkAPI();
+104 -75
View File
@@ -86,13 +86,7 @@ namespace openspace {
}
OSParallelConnection::~OSParallelConnection(){
_isRunning.store(false);
disconnect();
#if defined(__WIN32__)
WSACleanup();
#endif
disconnect();
}
void OSParallelConnection::clientConnect(){
@@ -136,55 +130,56 @@ namespace openspace {
}
void OSParallelConnection::connection(addrinfo *info){
int result;
_clientSocket = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
if (_clientSocket == INVALID_SOCKET){
freeaddrinfo(info);
#if defined(__WIN32__)
WSACleanup();
#endif
std::cerr << "Failed to InitializationRequest client socket!" << std::endl;
return;
}
int flag = 1;
int result;
//set no delay
result = setsockopt(_clientSocket, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */
TCP_NODELAY, /* name of option */
(char *)&flag, /* the cast is historical cruft */
sizeof(int)); /* length of option value */
//set send timeout
int timeout = 0; //infinite
result = setsockopt(
_clientSocket,
SOL_SOCKET,
SO_SNDTIMEO,
(char *)&timeout,
sizeof(timeout));
//set receive timeout
result = setsockopt(
_clientSocket,
SOL_SOCKET,
SO_RCVTIMEO,
(char *)&timeout,
sizeof(timeout));
result = setsockopt(_clientSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, sizeof(int));
if (result == SOCKET_ERROR)
std::cout << "Failed to set reuse address with error:" << _ERRNO << std::endl;
result = setsockopt(_clientSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&flag, sizeof(int));
if (result == SOCKET_ERROR)
std::cout << "Failed to set keep alive with error: " << _ERRNO << std::endl;
//try to connect to server
while (_isRunning.load()){
_clientSocket = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
if (_clientSocket == INVALID_SOCKET){
freeaddrinfo(info);
#if defined(__WIN32__)
WSACleanup();
#endif
std::cerr << "Failed to init client socket!" << std::endl;
return;
}
int flag = 1;
int result;
//set no delay
result = setsockopt(_clientSocket, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */
TCP_NODELAY, /* name of option */
(char *)&flag, /* the cast is historical cruft */
sizeof(int)); /* length of option value */
//set send timeout
int timeout = 0; //infinite
result = setsockopt(
_clientSocket,
SOL_SOCKET,
SO_SNDTIMEO,
(char *)&timeout,
sizeof(timeout));
//set receive timeout
result = setsockopt(
_clientSocket,
SOL_SOCKET,
SO_RCVTIMEO,
(char *)&timeout,
sizeof(timeout));
result = setsockopt(_clientSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, sizeof(int));
if (result == SOCKET_ERROR)
std::cout << "Failed to set reuse address with error:" << _ERRNO << std::endl;
result = setsockopt(_clientSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&flag, sizeof(int));
if (result == SOCKET_ERROR)
std::cout << "Failed to set keep alive with error: " << _ERRNO << std::endl;
result = connect(_clientSocket, info->ai_addr, (int)info->ai_addrlen);
if (result != SOCKET_ERROR)
{
@@ -198,7 +193,12 @@ namespace openspace {
//one sec sleep
std::this_thread::sleep_for(std::chrono::seconds(1));
}
//make sure to join the broadcast thread if started
//dont delete it, that will be done in disconnect() function
if(_broadcastThread != nullptr && _isHost.load()){
_isHost.store(false);
_broadcastThread->join();
}
//cleanup
freeaddrinfo(info);
}
@@ -269,7 +269,6 @@ namespace openspace {
}
void OSParallelConnection::decodeDataMessage(){
int result;
uint16_t msglen;
std::vector<char> buffer;
@@ -312,7 +311,7 @@ namespace openspace {
//start broadcasting
_isHost.store(true);
_broadcastThread = new (std::nothrow) std::thread(&OSParallelConnection::broadcast, this);
}
}
}
else{
//we were broadcasting but should stop now
@@ -439,7 +438,20 @@ namespace openspace {
}
void OSParallelConnection::requestHostship(){
std::vector<char> buffer;
buffer.reserve(headerSize + sizeof(int));
//header
buffer.insert(buffer.end(), 'O');
buffer.insert(buffer.end(), 'S');
buffer.insert(buffer.end(), 0);
buffer.insert(buffer.end(), 0);
//type of message
int type = OSParallelConnection::MessageTypes::HostshipRequest;
buffer.insert(buffer.end(), reinterpret_cast<char*>(&type), reinterpret_cast<char*>(&type) + sizeof(type));
//send message
send(_clientSocket, buffer.data(), buffer.size(), 0);
}
void OSParallelConnection::setPassword(const std::string& pwd){
@@ -447,23 +459,28 @@ namespace openspace {
}
void OSParallelConnection::disconnect(){
_isHost.store(false);
//must be run before trying to join communication threads, else the threads are stuck trying to receive data
closeSocket();
_isRunning.store(false);
_isHost.store(false);
if (_connectionThread != nullptr){
_connectionThread->join();
delete _connectionThread;
_connectionThread = nullptr;
}
if (_broadcastThread != nullptr){
_broadcastThread->join();
delete _broadcastThread;
_broadcastThread = nullptr;
}
if (_broadcastThread != nullptr){
_broadcastThread->join();
delete _broadcastThread;
_broadcastThread = nullptr;
}
_isRunning.store(false);
if (_connectionThread != nullptr){
_connectionThread->join();
delete _connectionThread;
_connectionThread = nullptr;
}
closeSocket();
#if defined(__WIN32__)
WSACleanup();
#endif
}
void OSParallelConnection::closeSocket(){
@@ -556,8 +573,8 @@ namespace openspace {
//send message
send(_clientSocket, buffer.data(), buffer.size(), 0);
//100 ms sleep
std::this_thread::sleep_for(std::chrono::milliseconds(100));
//200 ms sleep
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
@@ -595,6 +612,18 @@ namespace openspace {
"",
"Connect to parallel"
},
{
"disconnect",
&luascriptfunctions::disconnect,
"",
"Disconnect from parallel"
},
{
"requestHostship",
&luascriptfunctions::requestHostship,
"",
"Request to be the host for this session"
},
}
};
}
+15
View File
@@ -132,6 +132,21 @@ int connect(lua_State* L) {
return 0;
}
int disconnect(lua_State* L) {
int nArguments = lua_gettop(L);
if (nArguments != 0)
return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments);
OsEng.parallelConnection()->disconnect();
return 0;
}
int requestHostship(lua_State* L) {
int nArguments = lua_gettop(L);
if (nArguments != 0)
return luaL_error(L, "Expected %i arguments, got %i", 0, nArguments);
OsEng.parallelConnection()->requestHostship();
return 0;
}
} // namespace luascriptfunctions