diff --git a/include/openspace/network/parallelconnection.h b/include/openspace/network/parallelconnection.h index f4639400cd..558b78208b 100644 --- a/include/openspace/network/parallelconnection.h +++ b/include/openspace/network/parallelconnection.h @@ -119,14 +119,8 @@ namespace openspace{ std::string name(); - void setSocket(_SOCKET socket); - _SOCKET clientSocket(); - void setHost(bool host); - - bool isHost(); - bool isRunning(); void requestHostship(); @@ -175,27 +169,25 @@ namespace openspace{ return hashVal; }; - void writeHeader(std::vector &buffer); + void writeHeader(std::vector &buffer, uint32_t messageType); void closeSocket(); bool initNetworkAPI(); - void connection(addrinfo *info); + void tryConnect(addrinfo *info); - void authenticate(); + void sendAuthentication(); - void communicate(); + void listenCommunication(); - void delegateDecoding(int type); - - void decodeAuthenticationMessage(); + void delegateDecoding(uint32_t type); void decodeInitializationMessage(); - void decodeDataMessage(); + void decodeStreamDataMessage(); - void decodeScript(); + void decodeScriptMessage(); void decodeHostInfoMessage(); @@ -207,7 +199,6 @@ namespace openspace{ int receiveData(_SOCKET & socket, std::vector &buffer, int length, int flags); - int _headerSize; uint32_t _passCode; std::string _port; std::string _address; @@ -215,8 +206,9 @@ namespace openspace{ _SOCKET _clientSocket; std::thread *_connectionThread; std::thread *_broadcastThread; - std::atomic _isRunning; std::atomic _isHost; + std::atomic _isConnected; + std::atomic _isListening; }; } // namespace network diff --git a/src/network/parallelconnection.cpp b/src/network/parallelconnection.cpp index f0bb6e8d4a..4232cc79b8 100644 --- a/src/network/parallelconnection.cpp +++ b/src/network/parallelconnection.cpp @@ -80,11 +80,11 @@ namespace openspace { _clientSocket(INVALID_SOCKET), _connectionThread(nullptr), _broadcastThread(nullptr), - _isRunning(false), _isHost(false), - _headerSize(headerSize()) + _isConnected(false), + _isListening(false) { - + } ParallelConnection::~ParallelConnection(){ @@ -92,6 +92,11 @@ namespace openspace { } void ParallelConnection::clientConnect(){ + //we're already connected, do nothing + if(_isConnected.load()){ + return; + } + if (!initNetworkAPI()){ LERROR("Failed to initialize network API for Parallel Connection"); } @@ -107,10 +112,8 @@ namespace openspace { hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = AI_PASSIVE; - int result; - // Resolve the local address and port to be used by the server - result = getaddrinfo(_address.c_str(), _port.c_str(), &hints, &addresult); + int result = getaddrinfo(_address.c_str(), _port.c_str(), &hints, &addresult); if (result != 0) { #if defined(__WIN32__) @@ -121,13 +124,15 @@ namespace openspace { LINFO("Attempting to connect to address "<< _address << " on port " << _port); - //start connection thread - _isRunning.store(true); - _connectionThread = new (std::nothrow) std::thread(&ParallelConnection::connection, this, addresult); + //we're not connected + _isConnected.store(false); + + //start connection thread + _connectionThread = new (std::nothrow) std::thread(&ParallelConnection::tryConnect, this, addresult); } - void ParallelConnection::connection(addrinfo *info){ + void ParallelConnection::tryConnect(addrinfo *info){ _clientSocket = socket(info->ai_family, info->ai_socktype, info->ai_protocol); @@ -144,14 +149,14 @@ namespace openspace { int result; //set no delay - result = setsockopt(_clientSocket, /* socket affected */ - IPPROTO_TCP, /* set option at TCP level */ - TCP_NODELAY, /* name of option */ - (char *)&flag, /* the cast is historical cruft */ - sizeof(int)); /* length of option value */ + result = setsockopt(_clientSocket, /* socket affected */ + IPPROTO_TCP, /* set option at TCP level */ + TCP_NODELAY, /* name of option */ + (char *)&flag, /* the cast is historical cruft */ + sizeof(int)); /* length of option value */ //set send timeout - int timeout = 0; //infinite + int timeout = 0; result = setsockopt( _clientSocket, SOL_SOCKET, @@ -160,14 +165,13 @@ namespace openspace { sizeof(timeout)); //set receive timeout - timeout = 0; //infinite result = setsockopt( _clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)); - + result = setsockopt(_clientSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, sizeof(int)); if (result == SOCKET_ERROR) LERROR("Failed to set socket option 'reuse address'. Error code: " << _ERRNO); @@ -175,83 +179,84 @@ namespace openspace { result = setsockopt(_clientSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&flag, sizeof(int)); if (result == SOCKET_ERROR) LERROR("Failed to set socket option 'keep alive'. Error code: " << _ERRNO); - + + //while the connection thread is still running - while (_isRunning.load()){ - + while (!_isConnected.load()){ + //try to connect - result = connect(_clientSocket, info->ai_addr, (int)info->ai_addrlen); + result = connect(_clientSocket, info->ai_addr, (int)info->ai_addrlen); //if the connection was successfull - if (result != SOCKET_ERROR) - { - //send authentication - authenticate(); - - //start listening for communication - //communicate(); - } - - //try to connect once per second - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - - //make sure to join the broadcast thread if started - //dont delete it, that will be done in disconnect() function -// if(_broadcastThread != nullptr && _isHost.load()){ -// _isHost.store(false); -// _broadcastThread->join(); -// } + if (result != SOCKET_ERROR) + { + //we're connected + _isConnected.store(true); + + //and ready to start receiving messages + _isListening.store(true); + + //send authentication + sendAuthentication(); + + //start listening for communication + listenCommunication(); + } + + //try to connect once per second + std::this_thread::sleep_for(std::chrono::seconds(1)); + } //cleanup freeaddrinfo(info); } - void ParallelConnection::authenticate(){ + void ParallelConnection::sendAuthentication(){ + //length of this nodes name uint16_t namelen = static_cast(_name.length()); - int size = headerSize() + sizeof(uint32_t) + sizeof(uint16_t) + static_cast(namelen); - std::vector buffer; + + //total size of the buffer, header + size of passcodde + namelength + size of namelength + int size = headerSize() + sizeof(uint32_t) + sizeof(namelen) + static_cast(namelen); + + //create and reserve buffer + std::vector buffer; buffer.reserve(size); - //version - buffer.insert(buffer.end(), 'O'); - buffer.insert(buffer.end(), 'S'); - buffer.insert(buffer.end(), 0); - buffer.insert(buffer.end(), 0); + //write header to buffer + writeHeader(buffer, MessageTypes::Authentication); - //msg type, 0 = auth - int type = MessageTypes::Authentication; - buffer.insert(buffer.end(), reinterpret_cast(&type), reinterpret_cast(&type) + sizeof(int)); - - //passcode + //write passcode to buffer buffer.insert(buffer.end(), reinterpret_cast(&_passCode), reinterpret_cast(&_passCode) + sizeof(uint32_t)); - //name length + //write the length of the nodes name to buffer buffer.insert(buffer.end(), reinterpret_cast(&namelen), reinterpret_cast(&namelen) + sizeof(uint16_t)); - //name + //write this nodes name to buffer buffer.insert(buffer.end(), _name.begin(), _name.end()); - + + //send buffer int result = send(_clientSocket, buffer.data(), size, 0); + //if send failed if (result == SOCKET_ERROR){ //failed to send auth msg. - std::cerr << "Failed to send authentication message!" << std::endl; + LERROR("Failed to send authentication message."); } } - void ParallelConnection::delegateDecoding(int type){ + void ParallelConnection::delegateDecoding(uint32_t type){ switch (type){ case MessageTypes::Authentication: - decodeAuthenticationMessage(); + //do nothing for now break; case MessageTypes::Initialization: decodeInitializationMessage(); break; case MessageTypes::StreamData: - decodeDataMessage(); + decodeStreamDataMessage(); break; case MessageTypes::Script: + decodeScriptMessage(); break; case MessageTypes::HostInfo: decodeHostInfoMessage(); @@ -265,47 +270,58 @@ namespace openspace { } } - void ParallelConnection::decodeAuthenticationMessage(){ - printf("Auth OK!\n"); //more stuff here later - } - void ParallelConnection::decodeInitializationMessage(){ printf("Init message received!\n"); } - void ParallelConnection::decodeDataMessage(){ + void ParallelConnection::decodeStreamDataMessage(){ int result; uint16_t msglen; - std::vector buffer; + + //create a buffer to hold the size of streamdata message + std::vector buffer; buffer.resize(sizeof(msglen)); + + //read size of streamdata message result = receiveData(_clientSocket, buffer, sizeof(msglen), 0); if (result <= 0){ //error return; } - + + //the size in bytes of the streamdata message msglen = (*(reinterpret_cast(buffer.data()))); + //resize the buffer to be able to read the streamdata buffer.clear(); buffer.resize(msglen); + //read the data into buffer result = receiveData(_clientSocket, buffer, msglen, 0); + if (result <= 0){ //error return; } + //construct a keyframe ffrom the streamdata network::StreamDataKeyframe kf; kf.deserialize(buffer); + + //and add the keyframe to the interaction handler OsEng.interactionHandler()->addKeyframe(kf); } - void ParallelConnection::decodeScript(){ + void ParallelConnection::decodeScriptMessage(){ int result; uint16_t msglen; + + //create buffer to decode size of script std::vector buffer; buffer.resize(sizeof(msglen)); + + //read size of received script result = receiveData(_clientSocket, buffer, sizeof(msglen), 0); if (result <= 0){ @@ -313,74 +329,89 @@ namespace openspace { return; } + //size of recived script msglen = (*(reinterpret_cast(buffer.data()))); + //clear and resize buffer to decode actual script buffer.clear(); buffer.resize(msglen); + //decode script result = receiveData(_clientSocket, buffer, msglen, 0); + if (result <= 0){ //error return; } + //construct a script (string) from the data contained in the buffer std::string script(buffer.data()); + + //tell the script engine to execute the script when appropriate OsEng.scriptEngine()->queueScript(script); } void ParallelConnection::decodeHostInfoMessage(){ + //create buffer std::vector hostflag; + //resize to hold a flag saying if we're host or not hostflag.resize(1); + + //read data into buffer int result = receiveData(_clientSocket, hostflag, 1, 0); + //enough data was read if (result > 0){ + + //we've been assigned as host if (hostflag.at(0) == 1){ + //we're already host, do nothing (dummy check) if (_isHost.load()){ return; } else{ + //start broadcasting _isHost.store(true); _broadcastThread = new (std::nothrow) std::thread(&ParallelConnection::broadcast, this); } } - else{ + else{ //we've been assigned as client + //we were broadcasting but should stop now if (_isHost.load()){ + + //stop broadcast loop _isHost.store(false); - if (_broadcastThread != nullptr){ + + //and delete broadcasting thread + if (_broadcastThread != nullptr){ _broadcastThread->join(); delete _broadcastThread; _broadcastThread = nullptr; } + } else{ - //we were not host so nothing to do + //we were not broadcasting so nothing to do } - //request init packages from the host - int size = headerSize() + sizeof(uint32_t); + //request init package from the host + int size = headerSize(); std::vector buffer; buffer.reserve(size); - //version - buffer.insert(buffer.end(), 'O'); - buffer.insert(buffer.end(), 'S'); - buffer.insert(buffer.end(), 0); - buffer.insert(buffer.end(), 0); - - //msg type, 0 = auth - int type = MessageTypes::InitializationRequest; - buffer.insert(buffer.end(), reinterpret_cast(&type), reinterpret_cast(&type) + sizeof(int)); - + //write header + writeHeader(buffer, MessageTypes::InitializationRequest); + //send message send(_clientSocket, buffer.data(), buffer.size(), 0); } } else{ - std::cerr << "Error " << _ERRNO << " detected in connection!" << std::endl; + LERROR("Error " << _ERRNO << " detected in connection."); disconnect(); } } @@ -389,34 +420,44 @@ namespace openspace { printf("InitRequest message received!\n"); } - void ParallelConnection::communicate(){ + void ParallelConnection::listenCommunication(){ + //create basic buffer for receiving first part of messages std::vector buffer; - buffer.resize(8); + //size of the header + buffer.resize(headerSize()); + int result; - - while (_isRunning.load()){ + //while we're still connected and listening + while (_isListening.load()){ + //receive the first parts of a message result = receiveData(_clientSocket, buffer, headerSize(), 0); + //if enough data was received if (result > 0){ - if (buffer[0] == 'O' && //Open - buffer[1] == 'S' && //Space - buffer[2] == 0 && //version - buffer[3] == 0 //version + + //make sure that header matches this version of OpenSpace + if (buffer[0] == 'O' && //Open + buffer[1] == 'S' && //Space + buffer[2] == OPENSPACE_VERSION_MAJOR && // major version + buffer[3] == OPENSPACE_VERSION_MINOR // minor version ) { //parse type - int type = (*(reinterpret_cast(&buffer[4]))); + uint32_t type = (*(reinterpret_cast(&buffer[4]))); + + //and delegate decoding depending on type delegateDecoding(type); } } else{ if (result == 0){ //connection rejected - _isRunning.store(false); + _isConnected.store(false); + _isListening.store(false); } else{ - std::cerr << "Error " << _ERRNO << " detected in connection!" << std::endl; + LERROR("Error " << _ERRNO << " detected in connection!"); } break; } @@ -468,38 +509,16 @@ namespace openspace { return _name; } - void ParallelConnection::setSocket(_SOCKET socket){ - _clientSocket = socket; - } - _SOCKET ParallelConnection::clientSocket(){ return _clientSocket; } - void ParallelConnection::setHost(bool host){ - _isHost.store(host); - } - - bool ParallelConnection::isHost(){ - return _isHost.load(); - } - - bool ParallelConnection::isRunning(){ - return _isRunning.load(); - } - void ParallelConnection::requestHostship(){ std::vector buffer; - buffer.reserve(headerSize() + sizeof(int)); - //header - buffer.insert(buffer.end(), 'O'); - buffer.insert(buffer.end(), 'S'); - buffer.insert(buffer.end(), 0); - buffer.insert(buffer.end(), 0); - - //type of message - int type = ParallelConnection::MessageTypes::HostshipRequest; - buffer.insert(buffer.end(), reinterpret_cast(&type), reinterpret_cast(&type) + sizeof(type)); + buffer.reserve(headerSize()); + + //write header + writeHeader(buffer, MessageTypes::HostshipRequest); //send message send(_clientSocket, buffer.data(), buffer.size(), 0); @@ -515,16 +534,9 @@ namespace openspace { std::vector buffer; buffer.reserve(headerSize() + sizeof(msglen) + msglen); - //header - buffer.insert(buffer.end(), 'O'); - buffer.insert(buffer.end(), 'S'); - buffer.insert(buffer.end(), 0); - buffer.insert(buffer.end(), 0); - - //type of message - int type = ParallelConnection::MessageTypes::Script; - buffer.insert(buffer.end(), reinterpret_cast(&type), reinterpret_cast(&type) + sizeof(type)); - + //write header + writeHeader(buffer, MessageTypes::Script); + //size of message buffer.insert(buffer.end(), reinterpret_cast(&msglen), reinterpret_cast(&msglen) + sizeof(msglen)); @@ -541,22 +553,27 @@ namespace openspace { //must be run before trying to join communication threads, else the threads are stuck trying to receive data closeSocket(); - _isRunning.store(false); + //tell broadcast thread to stop broadcasting _isHost.store(false); + + //tell connection thread to stop trying to connect and stop listening for communication + _isConnected.store(true); + _isListening.store(false); + + //join connection thread and delete it if (_connectionThread != nullptr){ _connectionThread->join(); delete _connectionThread; _connectionThread = nullptr; } + //join broadcast thread and delete it if (_broadcastThread != nullptr){ _broadcastThread->join(); delete _broadcastThread; _broadcastThread = nullptr; } - - #if defined(__WIN32__) WSACleanup(); #endif @@ -581,7 +598,8 @@ namespace openspace { shutdown(_clientSocket, SD_BOTH); closesocket(_clientSocket); #else - shutdown(_clientSocket, SHUT_RDWR); + shutdown(_clientSocket, SHUT_RDWR); + close(_clientSocket); #endif _clientSocket = INVALID_SOCKET; @@ -603,7 +621,7 @@ namespace openspace { HIBYTE(wsaData.wVersion) != 2) { /* incorrect WinSock version */ - std::cerr << "Failed to init winsock API!" << std::endl; + LERROR("Failed to init winsock API."); WSACleanup(); return false; } @@ -616,32 +634,32 @@ namespace openspace { void ParallelConnection::broadcast(){ + //while we're still the host while (_isHost.load()){ + //create a keyframe with current position and orientation of camera network::StreamDataKeyframe kf; kf._position = OsEng.interactionHandler()->camera()->position(); kf._viewRotationQuat = glm::quat_cast(OsEng.interactionHandler()->camera()->viewRotationMatrix()); - //@TODO, implement method in openspace engine for this - kf._timeStamp = sgct::Engine::getTime(); + //timestamp as current runtime of OpenSpace instance + kf._timeStamp = OsEng.runTime(); - + //create a buffer for the keyframe std::vector kfBuffer; + + //fill the keyframe buffer kf.serialize(kfBuffer); + //get the size of the keyframebuffer uint16_t msglen = static_cast(kfBuffer.size()); + + //create the full buffer std::vector buffer; buffer.reserve(headerSize() + sizeof(msglen) + msglen); - //header - buffer.insert(buffer.end(), 'O'); - buffer.insert(buffer.end(), 'S'); - buffer.insert(buffer.end(), 0); - buffer.insert(buffer.end(), 0); - - //type of message - int type = ParallelConnection::MessageTypes::StreamData; - buffer.insert(buffer.end(), reinterpret_cast(&type), reinterpret_cast(&type) + sizeof(type)); + //write header + writeHeader(buffer, MessageTypes::StreamData); //size of message buffer.insert(buffer.end(), reinterpret_cast(&msglen), reinterpret_cast(&msglen) + sizeof(msglen)); @@ -652,12 +670,12 @@ namespace openspace { //send message send(_clientSocket, buffer.data(), buffer.size(), 0); - //100 ms sleep + //100 ms sleep - send keyframes 10 times per second std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } - void ParallelConnection::writeHeader(std::vector &buffer){ + void ParallelConnection::writeHeader(std::vector &buffer, uint32_t messageType){ //make sure the buffer is large enough to hold at least the header if(buffer.size() < headerSize()){ buffer.reserve(headerSize()); @@ -668,15 +686,16 @@ namespace openspace { uint8_t versionMinor = static_cast(OPENSPACE_VERSION_MINOR); //insert header into buffer - buffer.insert(buffer.begin(), 'O'); - buffer.insert(buffer.begin(), 'S'); - buffer.insert(buffer.begin(), static_cast(versionMajor)); - buffer.insert(buffer.begin(), static_cast(versionMinor)); + buffer.insert(buffer.end(), 'O'); + buffer.insert(buffer.end(), 'S'); + buffer.insert(buffer.end(), versionMajor); + buffer.insert(buffer.end(), versionMinor); + buffer.insert(buffer.end(), reinterpret_cast(&messageType), reinterpret_cast(&messageType) + sizeof(messageType)); } int ParallelConnection::headerSize(){ - //minor and major version (as uint8_t) + two bytes for the chars 'O' and 'S' - return 2 * sizeof(uint8_t) + 2; + //minor and major version (as uint8_t -> 1 byte) + two bytes for the chars 'O' and 'S' + 4 bytes for type of message + return 2 * sizeof(uint8_t) + 2 + sizeof(uint32_t); } scripting::ScriptEngine::LuaLibrary ParallelConnection::luaLibrary() {