code cleanup, commenting, refactoring, and renaming.

fixed a bug where threads would not close down correctly on Unix systems
This commit is contained in:
Joakim Kilby
2015-06-24 17:52:21 +02:00
parent 13eb012a04
commit ff49a7cdd2
2 changed files with 185 additions and 174 deletions

View File

@@ -119,14 +119,8 @@ namespace openspace{
std::string name();
void setSocket(_SOCKET socket);
_SOCKET clientSocket();
void setHost(bool host);
bool isHost();
bool isRunning();
void requestHostship();
@@ -175,27 +169,25 @@ namespace openspace{
return hashVal;
};
void writeHeader(std::vector<char> &buffer);
void writeHeader(std::vector<char> &buffer, uint32_t messageType);
void closeSocket();
bool initNetworkAPI();
void connection(addrinfo *info);
void tryConnect(addrinfo *info);
void authenticate();
void sendAuthentication();
void communicate();
void listenCommunication();
void delegateDecoding(int type);
void decodeAuthenticationMessage();
void delegateDecoding(uint32_t type);
void decodeInitializationMessage();
void decodeDataMessage();
void decodeStreamDataMessage();
void decodeScript();
void decodeScriptMessage();
void decodeHostInfoMessage();
@@ -207,7 +199,6 @@ namespace openspace{
int receiveData(_SOCKET & socket, std::vector<char> &buffer, int length, int flags);
int _headerSize;
uint32_t _passCode;
std::string _port;
std::string _address;
@@ -215,8 +206,9 @@ namespace openspace{
_SOCKET _clientSocket;
std::thread *_connectionThread;
std::thread *_broadcastThread;
std::atomic<bool> _isRunning;
std::atomic<bool> _isHost;
std::atomic<bool> _isConnected;
std::atomic<bool> _isListening;
};
} // namespace network

View File

@@ -80,11 +80,11 @@ namespace openspace {
_clientSocket(INVALID_SOCKET),
_connectionThread(nullptr),
_broadcastThread(nullptr),
_isRunning(false),
_isHost(false),
_headerSize(headerSize())
_isConnected(false),
_isListening(false)
{
}
ParallelConnection::~ParallelConnection(){
@@ -92,6 +92,11 @@ namespace openspace {
}
void ParallelConnection::clientConnect(){
//we're already connected, do nothing
if(_isConnected.load()){
return;
}
if (!initNetworkAPI()){
LERROR("Failed to initialize network API for Parallel Connection");
}
@@ -107,10 +112,8 @@ namespace openspace {
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
int result;
// Resolve the local address and port to be used by the server
result = getaddrinfo(_address.c_str(), _port.c_str(), &hints, &addresult);
int result = getaddrinfo(_address.c_str(), _port.c_str(), &hints, &addresult);
if (result != 0)
{
#if defined(__WIN32__)
@@ -121,13 +124,15 @@ namespace openspace {
LINFO("Attempting to connect to address "<< _address << " on port " << _port);
//start connection thread
_isRunning.store(true);
_connectionThread = new (std::nothrow) std::thread(&ParallelConnection::connection, this, addresult);
//we're not connected
_isConnected.store(false);
//start connection thread
_connectionThread = new (std::nothrow) std::thread(&ParallelConnection::tryConnect, this, addresult);
}
void ParallelConnection::connection(addrinfo *info){
void ParallelConnection::tryConnect(addrinfo *info){
_clientSocket = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
@@ -144,14 +149,14 @@ namespace openspace {
int result;
//set no delay
result = setsockopt(_clientSocket, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */
TCP_NODELAY, /* name of option */
(char *)&flag, /* the cast is historical cruft */
sizeof(int)); /* length of option value */
result = setsockopt(_clientSocket, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */
TCP_NODELAY, /* name of option */
(char *)&flag, /* the cast is historical cruft */
sizeof(int)); /* length of option value */
//set send timeout
int timeout = 0; //infinite
int timeout = 0;
result = setsockopt(
_clientSocket,
SOL_SOCKET,
@@ -160,14 +165,13 @@ namespace openspace {
sizeof(timeout));
//set receive timeout
timeout = 0; //infinite
result = setsockopt(
_clientSocket,
SOL_SOCKET,
SO_RCVTIMEO,
(char *)&timeout,
sizeof(timeout));
result = setsockopt(_clientSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, sizeof(int));
if (result == SOCKET_ERROR)
LERROR("Failed to set socket option 'reuse address'. Error code: " << _ERRNO);
@@ -175,83 +179,84 @@ namespace openspace {
result = setsockopt(_clientSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&flag, sizeof(int));
if (result == SOCKET_ERROR)
LERROR("Failed to set socket option 'keep alive'. Error code: " << _ERRNO);
//while the connection thread is still running
while (_isRunning.load()){
while (!_isConnected.load()){
//try to connect
result = connect(_clientSocket, info->ai_addr, (int)info->ai_addrlen);
result = connect(_clientSocket, info->ai_addr, (int)info->ai_addrlen);
//if the connection was successfull
if (result != SOCKET_ERROR)
{
//send authentication
authenticate();
//start listening for communication
//communicate();
}
//try to connect once per second
std::this_thread::sleep_for(std::chrono::seconds(1));
}
//make sure to join the broadcast thread if started
//dont delete it, that will be done in disconnect() function
// if(_broadcastThread != nullptr && _isHost.load()){
// _isHost.store(false);
// _broadcastThread->join();
// }
if (result != SOCKET_ERROR)
{
//we're connected
_isConnected.store(true);
//and ready to start receiving messages
_isListening.store(true);
//send authentication
sendAuthentication();
//start listening for communication
listenCommunication();
}
//try to connect once per second
std::this_thread::sleep_for(std::chrono::seconds(1));
}
//cleanup
freeaddrinfo(info);
}
void ParallelConnection::authenticate(){
void ParallelConnection::sendAuthentication(){
//length of this nodes name
uint16_t namelen = static_cast<uint16_t>(_name.length());
int size = headerSize() + sizeof(uint32_t) + sizeof(uint16_t) + static_cast<int>(namelen);
std::vector<char> buffer;
//total size of the buffer, header + size of passcodde + namelength + size of namelength
int size = headerSize() + sizeof(uint32_t) + sizeof(namelen) + static_cast<int>(namelen);
//create and reserve buffer
std::vector<char> buffer;
buffer.reserve(size);
//version
buffer.insert(buffer.end(), 'O');
buffer.insert(buffer.end(), 'S');
buffer.insert(buffer.end(), 0);
buffer.insert(buffer.end(), 0);
//write header to buffer
writeHeader(buffer, MessageTypes::Authentication);
//msg type, 0 = auth
int type = MessageTypes::Authentication;
buffer.insert(buffer.end(), reinterpret_cast<char*>(&type), reinterpret_cast<char*>(&type) + sizeof(int));
//passcode
//write passcode to buffer
buffer.insert(buffer.end(), reinterpret_cast<char*>(&_passCode), reinterpret_cast<char*>(&_passCode) + sizeof(uint32_t));
//name length
//write the length of the nodes name to buffer
buffer.insert(buffer.end(), reinterpret_cast<char*>(&namelen), reinterpret_cast<char*>(&namelen) + sizeof(uint16_t));
//name
//write this nodes name to buffer
buffer.insert(buffer.end(), _name.begin(), _name.end());
//send buffer
int result = send(_clientSocket, buffer.data(), size, 0);
//if send failed
if (result == SOCKET_ERROR){
//failed to send auth msg.
std::cerr << "Failed to send authentication message!" << std::endl;
LERROR("Failed to send authentication message.");
}
}
void ParallelConnection::delegateDecoding(int type){
void ParallelConnection::delegateDecoding(uint32_t type){
switch (type){
case MessageTypes::Authentication:
decodeAuthenticationMessage();
//do nothing for now
break;
case MessageTypes::Initialization:
decodeInitializationMessage();
break;
case MessageTypes::StreamData:
decodeDataMessage();
decodeStreamDataMessage();
break;
case MessageTypes::Script:
decodeScriptMessage();
break;
case MessageTypes::HostInfo:
decodeHostInfoMessage();
@@ -265,47 +270,58 @@ namespace openspace {
}
}
void ParallelConnection::decodeAuthenticationMessage(){
printf("Auth OK!\n"); //more stuff here later
}
void ParallelConnection::decodeInitializationMessage(){
printf("Init message received!\n");
}
void ParallelConnection::decodeDataMessage(){
void ParallelConnection::decodeStreamDataMessage(){
int result;
uint16_t msglen;
std::vector<char> buffer;
//create a buffer to hold the size of streamdata message
std::vector<char> buffer;
buffer.resize(sizeof(msglen));
//read size of streamdata message
result = receiveData(_clientSocket, buffer, sizeof(msglen), 0);
if (result <= 0){
//error
return;
}
//the size in bytes of the streamdata message
msglen = (*(reinterpret_cast<uint16_t*>(buffer.data())));
//resize the buffer to be able to read the streamdata
buffer.clear();
buffer.resize(msglen);
//read the data into buffer
result = receiveData(_clientSocket, buffer, msglen, 0);
if (result <= 0){
//error
return;
}
//construct a keyframe ffrom the streamdata
network::StreamDataKeyframe kf;
kf.deserialize(buffer);
//and add the keyframe to the interaction handler
OsEng.interactionHandler()->addKeyframe(kf);
}
void ParallelConnection::decodeScript(){
void ParallelConnection::decodeScriptMessage(){
int result;
uint16_t msglen;
//create buffer to decode size of script
std::vector<char> buffer;
buffer.resize(sizeof(msglen));
//read size of received script
result = receiveData(_clientSocket, buffer, sizeof(msglen), 0);
if (result <= 0){
@@ -313,74 +329,89 @@ namespace openspace {
return;
}
//size of recived script
msglen = (*(reinterpret_cast<uint16_t*>(buffer.data())));
//clear and resize buffer to decode actual script
buffer.clear();
buffer.resize(msglen);
//decode script
result = receiveData(_clientSocket, buffer, msglen, 0);
if (result <= 0){
//error
return;
}
//construct a script (string) from the data contained in the buffer
std::string script(buffer.data());
//tell the script engine to execute the script when appropriate
OsEng.scriptEngine()->queueScript(script);
}
void ParallelConnection::decodeHostInfoMessage(){
//create buffer
std::vector<char> hostflag;
//resize to hold a flag saying if we're host or not
hostflag.resize(1);
//read data into buffer
int result = receiveData(_clientSocket, hostflag, 1, 0);
//enough data was read
if (result > 0){
//we've been assigned as host
if (hostflag.at(0) == 1){
//we're already host, do nothing (dummy check)
if (_isHost.load()){
return;
}
else{
//start broadcasting
_isHost.store(true);
_broadcastThread = new (std::nothrow) std::thread(&ParallelConnection::broadcast, this);
}
}
else{
else{ //we've been assigned as client
//we were broadcasting but should stop now
if (_isHost.load()){
//stop broadcast loop
_isHost.store(false);
if (_broadcastThread != nullptr){
//and delete broadcasting thread
if (_broadcastThread != nullptr){
_broadcastThread->join();
delete _broadcastThread;
_broadcastThread = nullptr;
}
}
else{
//we were not host so nothing to do
//we were not broadcasting so nothing to do
}
//request init packages from the host
int size = headerSize() + sizeof(uint32_t);
//request init package from the host
int size = headerSize();
std::vector<char> buffer;
buffer.reserve(size);
//version
buffer.insert(buffer.end(), 'O');
buffer.insert(buffer.end(), 'S');
buffer.insert(buffer.end(), 0);
buffer.insert(buffer.end(), 0);
//msg type, 0 = auth
int type = MessageTypes::InitializationRequest;
buffer.insert(buffer.end(), reinterpret_cast<char*>(&type), reinterpret_cast<char*>(&type) + sizeof(int));
//write header
writeHeader(buffer, MessageTypes::InitializationRequest);
//send message
send(_clientSocket, buffer.data(), buffer.size(), 0);
}
}
else{
std::cerr << "Error " << _ERRNO << " detected in connection!" << std::endl;
LERROR("Error " << _ERRNO << " detected in connection.");
disconnect();
}
}
@@ -389,34 +420,44 @@ namespace openspace {
printf("InitRequest message received!\n");
}
void ParallelConnection::communicate(){
void ParallelConnection::listenCommunication(){
//create basic buffer for receiving first part of messages
std::vector<char> buffer;
buffer.resize(8);
//size of the header
buffer.resize(headerSize());
int result;
while (_isRunning.load()){
//while we're still connected and listening
while (_isListening.load()){
//receive the first parts of a message
result = receiveData(_clientSocket, buffer, headerSize(), 0);
//if enough data was received
if (result > 0){
if (buffer[0] == 'O' && //Open
buffer[1] == 'S' && //Space
buffer[2] == 0 && //version
buffer[3] == 0 //version
//make sure that header matches this version of OpenSpace
if (buffer[0] == 'O' && //Open
buffer[1] == 'S' && //Space
buffer[2] == OPENSPACE_VERSION_MAJOR && // major version
buffer[3] == OPENSPACE_VERSION_MINOR // minor version
)
{
//parse type
int type = (*(reinterpret_cast<int*>(&buffer[4])));
uint32_t type = (*(reinterpret_cast<uint32_t*>(&buffer[4])));
//and delegate decoding depending on type
delegateDecoding(type);
}
}
else{
if (result == 0){
//connection rejected
_isRunning.store(false);
_isConnected.store(false);
_isListening.store(false);
}
else{
std::cerr << "Error " << _ERRNO << " detected in connection!" << std::endl;
LERROR("Error " << _ERRNO << " detected in connection!");
}
break;
}
@@ -468,38 +509,16 @@ namespace openspace {
return _name;
}
void ParallelConnection::setSocket(_SOCKET socket){
_clientSocket = socket;
}
_SOCKET ParallelConnection::clientSocket(){
return _clientSocket;
}
void ParallelConnection::setHost(bool host){
_isHost.store(host);
}
bool ParallelConnection::isHost(){
return _isHost.load();
}
bool ParallelConnection::isRunning(){
return _isRunning.load();
}
void ParallelConnection::requestHostship(){
std::vector<char> buffer;
buffer.reserve(headerSize() + sizeof(int));
//header
buffer.insert(buffer.end(), 'O');
buffer.insert(buffer.end(), 'S');
buffer.insert(buffer.end(), 0);
buffer.insert(buffer.end(), 0);
//type of message
int type = ParallelConnection::MessageTypes::HostshipRequest;
buffer.insert(buffer.end(), reinterpret_cast<char*>(&type), reinterpret_cast<char*>(&type) + sizeof(type));
buffer.reserve(headerSize());
//write header
writeHeader(buffer, MessageTypes::HostshipRequest);
//send message
send(_clientSocket, buffer.data(), buffer.size(), 0);
@@ -515,16 +534,9 @@ namespace openspace {
std::vector<char> buffer;
buffer.reserve(headerSize() + sizeof(msglen) + msglen);
//header
buffer.insert(buffer.end(), 'O');
buffer.insert(buffer.end(), 'S');
buffer.insert(buffer.end(), 0);
buffer.insert(buffer.end(), 0);
//type of message
int type = ParallelConnection::MessageTypes::Script;
buffer.insert(buffer.end(), reinterpret_cast<char*>(&type), reinterpret_cast<char*>(&type) + sizeof(type));
//write header
writeHeader(buffer, MessageTypes::Script);
//size of message
buffer.insert(buffer.end(), reinterpret_cast<char*>(&msglen), reinterpret_cast<char*>(&msglen) + sizeof(msglen));
@@ -541,22 +553,27 @@ namespace openspace {
//must be run before trying to join communication threads, else the threads are stuck trying to receive data
closeSocket();
_isRunning.store(false);
//tell broadcast thread to stop broadcasting
_isHost.store(false);
//tell connection thread to stop trying to connect and stop listening for communication
_isConnected.store(true);
_isListening.store(false);
//join connection thread and delete it
if (_connectionThread != nullptr){
_connectionThread->join();
delete _connectionThread;
_connectionThread = nullptr;
}
//join broadcast thread and delete it
if (_broadcastThread != nullptr){
_broadcastThread->join();
delete _broadcastThread;
_broadcastThread = nullptr;
}
#if defined(__WIN32__)
WSACleanup();
#endif
@@ -581,7 +598,8 @@ namespace openspace {
shutdown(_clientSocket, SD_BOTH);
closesocket(_clientSocket);
#else
shutdown(_clientSocket, SHUT_RDWR);
shutdown(_clientSocket, SHUT_RDWR);
close(_clientSocket);
#endif
_clientSocket = INVALID_SOCKET;
@@ -603,7 +621,7 @@ namespace openspace {
HIBYTE(wsaData.wVersion) != 2)
{
/* incorrect WinSock version */
std::cerr << "Failed to init winsock API!" << std::endl;
LERROR("Failed to init winsock API.");
WSACleanup();
return false;
}
@@ -616,32 +634,32 @@ namespace openspace {
void ParallelConnection::broadcast(){
//while we're still the host
while (_isHost.load()){
//create a keyframe with current position and orientation of camera
network::StreamDataKeyframe kf;
kf._position = OsEng.interactionHandler()->camera()->position();
kf._viewRotationQuat = glm::quat_cast(OsEng.interactionHandler()->camera()->viewRotationMatrix());
//@TODO, implement method in openspace engine for this
kf._timeStamp = sgct::Engine::getTime();
//timestamp as current runtime of OpenSpace instance
kf._timeStamp = OsEng.runTime();
//create a buffer for the keyframe
std::vector<char> kfBuffer;
//fill the keyframe buffer
kf.serialize(kfBuffer);
//get the size of the keyframebuffer
uint16_t msglen = static_cast<uint16_t>(kfBuffer.size());
//create the full buffer
std::vector<char> buffer;
buffer.reserve(headerSize() + sizeof(msglen) + msglen);
//header
buffer.insert(buffer.end(), 'O');
buffer.insert(buffer.end(), 'S');
buffer.insert(buffer.end(), 0);
buffer.insert(buffer.end(), 0);
//type of message
int type = ParallelConnection::MessageTypes::StreamData;
buffer.insert(buffer.end(), reinterpret_cast<char*>(&type), reinterpret_cast<char*>(&type) + sizeof(type));
//write header
writeHeader(buffer, MessageTypes::StreamData);
//size of message
buffer.insert(buffer.end(), reinterpret_cast<char*>(&msglen), reinterpret_cast<char*>(&msglen) + sizeof(msglen));
@@ -652,12 +670,12 @@ namespace openspace {
//send message
send(_clientSocket, buffer.data(), buffer.size(), 0);
//100 ms sleep
//100 ms sleep - send keyframes 10 times per second
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void ParallelConnection::writeHeader(std::vector<char> &buffer){
void ParallelConnection::writeHeader(std::vector<char> &buffer, uint32_t messageType){
//make sure the buffer is large enough to hold at least the header
if(buffer.size() < headerSize()){
buffer.reserve(headerSize());
@@ -668,15 +686,16 @@ namespace openspace {
uint8_t versionMinor = static_cast<uint8_t>(OPENSPACE_VERSION_MINOR);
//insert header into buffer
buffer.insert(buffer.begin(), 'O');
buffer.insert(buffer.begin(), 'S');
buffer.insert(buffer.begin(), static_cast<char>(versionMajor));
buffer.insert(buffer.begin(), static_cast<char>(versionMinor));
buffer.insert(buffer.end(), 'O');
buffer.insert(buffer.end(), 'S');
buffer.insert(buffer.end(), versionMajor);
buffer.insert(buffer.end(), versionMinor);
buffer.insert(buffer.end(), reinterpret_cast<char*>(&messageType), reinterpret_cast<char*>(&messageType) + sizeof(messageType));
}
int ParallelConnection::headerSize(){
//minor and major version (as uint8_t) + two bytes for the chars 'O' and 'S'
return 2 * sizeof(uint8_t) + 2;
//minor and major version (as uint8_t -> 1 byte) + two bytes for the chars 'O' and 'S' + 4 bytes for type of message
return 2 * sizeof(uint8_t) + 2 + sizeof(uint32_t);
}
scripting::ScriptEngine::LuaLibrary ParallelConnection::luaLibrary() {