adding initialization messages functionality

This commit is contained in:
Joakim Kilby
2015-07-06 14:06:54 +02:00
parent 446aa544b8
commit 51bab14dcc
2 changed files with 248 additions and 130 deletions
+246 -129
View File
@@ -365,7 +365,7 @@ namespace openspace {
buffer.insert(buffer.end(), _name.begin(), _name.end());
//send buffer
queMessage(buffer);
queueMessage(buffer);
}
void ParallelConnection::delegateDecoding(uint32_t type){
@@ -373,9 +373,9 @@ namespace openspace {
case MessageTypes::Authentication:
//do nothing for now
break;
// case MessageTypes::Initialization:
// initializationMessageReceived();
// break;
case MessageTypes::Initialization:
initializationMessageReceived();
break;
case MessageTypes::Data:
dataMessageReceived();
// break;
@@ -386,9 +386,9 @@ namespace openspace {
case MessageTypes::HostInfo:
hostInfoMessageReceived();
break;
// case MessageTypes::InitializationRequest:
// initializationRequestMessageReceived();
// break;
case MessageTypes::InitializationRequest:
initializationRequestMessageReceived();
break;
default:
//unknown message type
break;
@@ -398,85 +398,140 @@ namespace openspace {
void ParallelConnection::initializationMessageReceived(){
int result;
uint16_t numScripts;
uint32_t datalen;
uint32_t id;
//create a buffer to hold the number of scripts in the initialization message
//ID and size of data chunk are removed by the server
uint16_t numscripts;
std::vector<char> buffer;
buffer.resize(sizeof(id));
buffer.resize(sizeof(numscripts));
//read recepient ID (mine)
result = receiveData(_clientSocket, buffer, sizeof(id), 0);
if (result <= 0){
//error
return;
}
id = *(reinterpret_cast<uint32_t*>(buffer.data()));
//read data length
result = receiveData(_clientSocket, buffer, sizeof(datalen), 0);
if (result <= 0){
//error
return;
}
datalen = *(reinterpret_cast<uint32_t*>(buffer.data()));
buffer.clear();
buffer.resize(sizeof(numScripts));
//read number of scripts
result = receiveData(_clientSocket, buffer, sizeof(numScripts), 0);
if (result <= 0){
//error
return;
}
numScripts = *(reinterpret_cast<uint16_t*>(buffer.data()));
//declare placeholder for all received scripts
std::vector<std::string> initScripts;
initScripts.reserve(numScripts);
//read number of scripts
result = receiveData(_clientSocket, buffer, sizeof(numscripts), 0);
if(result < 0){
//error
}
//length of each script and resize receiveing buffer
numscripts = *(reinterpret_cast<uint16_t*>(buffer.data()));
//length of current script
uint16_t scriptlen;
buffer.clear();
buffer.resize(sizeof(scriptlen));
buffer.resize(scriptlen);
//buffer for holding received scripts
std::vector<char> scriptbuffer;
//holder for current script
std::string script;
for(int n = 0; n < numScripts; ++n){
//read size in chars of next script
result = receiveData(_clientSocket, buffer, sizeof(scriptlen), 0);
if (result <= 0){
for(int n = 0; n < numscripts; ++n){
//read length of script
result = receiveData(_clientSocket, buffer, sizeof(numscripts), 0);
if(result < 0){
//error
return;
}
//assign size of next script
scriptlen = *reinterpret_cast<uint16_t*>(buffer.data());
scriptlen = *(reinterpret_cast<uint16_t*>(buffer.data()));
//resize buffer
scriptbuffer.clear();
scriptbuffer.resize(scriptlen);
buffer.clear();
buffer.resize(scriptlen);
//read next script
result = receiveData(_clientSocket, scriptbuffer, scriptlen, 0);
//read script
result = receiveData(_clientSocket, buffer, scriptlen, 0);
//assign current script
script.clear();
script.assign(buffer.begin(), buffer.end());
if (result <= 0){
//error
return;
}
//create a string from received chars in buffer
std::string script;
script.assign(scriptbuffer.begin(), scriptbuffer.end());
//que script with the script engine
//queue received script
OsEng.scriptEngine()->queueScript(script);
}
//we've gone through all scripts, initialization is done
buffer.clear();
writeHeader(buffer, MessageTypes::InitializationCompleted);
//let the server know
queueMessage(buffer);
// int result;
// uint16_t numScripts;
// uint32_t datalen;
// uint32_t id;
//
// //create a buffer to hold the number of scripts in the initialization message
// std::vector<char> buffer;
// buffer.resize(sizeof(id));
//
// //read recepient ID (mine)
// result = receiveData(_clientSocket, buffer, sizeof(id), 0);
// if (result <= 0){
// //error
// return;
// }
// id = *(reinterpret_cast<uint32_t*>(buffer.data()));
//
// //read data length
// result = receiveData(_clientSocket, buffer, sizeof(datalen), 0);
// if (result <= 0){
// //error
// return;
// }
// datalen = *(reinterpret_cast<uint32_t*>(buffer.data()));
//
// buffer.clear();
// buffer.resize(sizeof(numScripts));
// //read number of scripts
// result = receiveData(_clientSocket, buffer, sizeof(numScripts), 0);
// if (result <= 0){
// //error
// return;
// }
// numScripts = *(reinterpret_cast<uint16_t*>(buffer.data()));
//
// //declare placeholder for all received scripts
// std::vector<std::string> initScripts;
// initScripts.reserve(numScripts);
//
// //length of each script and resize receiveing buffer
// uint16_t scriptlen;
// buffer.clear();
// buffer.resize(sizeof(scriptlen));
//
// //buffer for holding received scripts
// std::vector<char> scriptbuffer;
//
// for(int n = 0; n < numScripts; ++n){
//
// //read size in chars of next script
// result = receiveData(_clientSocket, buffer, sizeof(scriptlen), 0);
//
// if (result <= 0){
// //error
// return;
// }
//
// //assign size of next script
// scriptlen = *reinterpret_cast<uint16_t*>(buffer.data());
//
// //resize buffer
// scriptbuffer.clear();
// scriptbuffer.resize(scriptlen);
//
// //read next script
// result = receiveData(_clientSocket, scriptbuffer, scriptlen, 0);
//
// if (result <= 0){
// //error
// return;
// }
//
// //create a string from received chars in buffer
// std::string script;
// script.assign(scriptbuffer.begin(), scriptbuffer.end());
//
// //que script with the script engine
// OsEng.scriptEngine()->queueScript(script);
// }
}
void ParallelConnection::dataMessageReceived(){
@@ -624,7 +679,7 @@ namespace openspace {
OsEng.scriptEngine()->queueScript(script);
}
void ParallelConnection::queMessage(std::vector<char> message){
void ParallelConnection::queueMessage(std::vector<char> message){
_sendBufferMutex.lock();
_sendBuffer.push_back(message);
_sendBufferMutex.unlock();
@@ -715,7 +770,7 @@ namespace openspace {
writeHeader(buffer, MessageTypes::InitializationRequest);
//send message
queMessage(buffer);
queueMessage(buffer);
}
}
else{
@@ -725,74 +780,133 @@ namespace openspace {
}
void ParallelConnection::initializationRequestMessageReceived(){
//get current state as scripts
std::vector<std::string> scripts;
std::map<std::string, std::string>::iterator state_it;
{
//mutex protect
std::lock_guard<std::mutex> lock(_currentStateMutex);
for(state_it = _currentState.begin();
state_it != _currentState.end();
++state_it){
scripts.push_back(scriptFromPropertyAndValue(state_it->first, state_it->second));
}
}
//get requester ID
std::vector<char> buffer;
buffer.resize(sizeof(uint32_t));
receiveData(_clientSocket, buffer, sizeof(uint32_t), 0);
uint32_t requesterID = *reinterpret_cast<uint32_t*>(buffer.data());
printf("InitRequest message received from client %d!\n", requesterID);
//construct init msg
std::vector<std::string> scripts = OsEng.scriptEngine()->cachedScripts();
//total number of scripts sent
uint16_t numscripts = 0;
//temporary buffers
std::vector<char> scriptbuffer;
std::vector<char> tmpbuffer;
uint16_t scriptlen;
uint32_t totlen = 0;
std::vector<std::string>::const_iterator it;
std::vector<char> scriptMsg;
//add a script of the current time to ensure all nodes are on the same page
std::string timescript = "openspace.time.setTime(" + std::to_string(Time::ref().currentTime()) + ");";
scripts.push_back(timescript);
//add a script of the current delta time to ensure all nodes are on the same page
std::string dtscript = "openspace.time.setDeltaTime(" + std::to_string(Time::ref().deltaTime()) + ");";
scripts.push_back(dtscript);
//add a terminating script letting the server know the client is fully initialized
std::string donescript = "openspace.parallel.initialized();";
scripts.push_back(donescript);
//total number of scripts
uint16_t numScrips = static_cast<uint16_t>(scripts.size());
//write all scripts
for(it = scripts.cbegin();
it != scripts.cend();
++it){
//write size of script in chars
scriptlen = (*it).size();
scriptMsg.insert(scriptMsg.end(), reinterpret_cast<char*>(&scriptlen), reinterpret_cast<char*>(&scriptlen) + sizeof(scriptlen));
//serialize and encode all scripts into scriptbuffer
std::vector<std::string>::iterator script_it;
network::datamessagestructures::ScriptMessage sm;
for(script_it = scripts.begin();
script_it != scripts.end();
++script_it){
sm._script = *script_it;
sm._scriptlen = script_it->length();
//serialize current script
tmpbuffer.clear();
sm.serialize(tmpbuffer);
//write actual scripts
scriptMsg.insert(scriptMsg.end(), (*it).begin(), (*it).end());
//and insert into full buffer
scriptbuffer.insert(scriptbuffer.end(), tmpbuffer.begin(), tmpbuffer.end());
//add script length to total data length
totlen += static_cast<uint32_t>(scriptlen) + sizeof(scriptlen);
//increment number of scripts
numscripts++;
}
//clear buffer
buffer.clear();
//write header
buffer.clear();
writeHeader(buffer, MessageTypes::Initialization);
//write requester ID
buffer.insert(buffer.end(), reinterpret_cast<char*>(&requesterID), reinterpret_cast<char*>(&requesterID) + sizeof(requesterID));
//write size of data chunk
buffer.insert(buffer.end(), reinterpret_cast<char*>(&totlen), reinterpret_cast<char*>(&totlen) + sizeof(totlen));
//write total size of data chunk
uint32_t totlen = static_cast<uint32_t>(scriptbuffer.size());
buffer.insert(buffer.end(), reinterpret_cast<char*>(&totlen), reinterpret_cast<char*>(&totlen) + sizeof(uint32_t));
//write number of scripts
buffer.insert(buffer.end(), reinterpret_cast<char*>(&numScrips), reinterpret_cast<char*>(&numScrips) + sizeof(numScrips));
buffer.insert(buffer.end(), reinterpret_cast<char*>(&numscripts), reinterpret_cast<char*>(&numscripts) + sizeof(uint16_t));
//write all scripts and their lengths
buffer.insert(buffer.end(), scriptMsg.begin(), scriptMsg.end());
//write all scripts
buffer.insert(buffer.end(), scriptbuffer.begin(), scriptbuffer.end());
//send initialization message
queMessage(buffer);
//queue message
queueMessage(buffer);
//
// //construct init msg
// std::vector<std::string> scripts = OsEng.scriptEngine()->cachedScripts();
//
//
// uint16_t scriptlen;
// uint32_t totlen = 0;
// std::vector<std::string>::const_iterator it;
//
// std::vector<char> scriptMsg;
//
// //add a script of the current time to ensure all nodes are on the same page
// std::string timescript = "openspace.time.setTime(" + std::to_string(Time::ref().currentTime()) + ");";
// scripts.push_back(timescript);
//
// //add a script of the current delta time to ensure all nodes are on the same page
//
// std::string dtscript = "openspace.time.setDeltaTime(" + std::to_string(Time::ref().deltaTime()) + ");";
// scripts.push_back(dtscript);
//
// //add a terminating script letting the server know the client is fully initialized
// std::string donescript = "openspace.parallel.initialized();";
// scripts.push_back(donescript);
//
// //total number of scripts
// uint16_t numScrips = static_cast<uint16_t>(scripts.size());
//
// //write all scripts
// for(it = scripts.cbegin();
// it != scripts.cend();
// ++it){
// //write size of script in chars
// scriptlen = (*it).size();
// scriptMsg.insert(scriptMsg.end(), reinterpret_cast<char*>(&scriptlen), reinterpret_cast<char*>(&scriptlen) + sizeof(scriptlen));
//
// //write actual scripts
// scriptMsg.insert(scriptMsg.end(), (*it).begin(), (*it).end());
//
// //add script length to total data length
// totlen += static_cast<uint32_t>(scriptlen) + sizeof(scriptlen);
// }
//
// //clear buffer
// buffer.clear();
//
// //write header
// writeHeader(buffer, MessageTypes::Initialization);
//
// //write requester ID
// buffer.insert(buffer.end(), reinterpret_cast<char*>(&requesterID), reinterpret_cast<char*>(&requesterID) + sizeof(requesterID));
//
//
// //write size of data chunk
// buffer.insert(buffer.end(), reinterpret_cast<char*>(&totlen), reinterpret_cast<char*>(&totlen) + sizeof(totlen));
//
// //write number of scripts
// buffer.insert(buffer.end(), reinterpret_cast<char*>(&numScrips), reinterpret_cast<char*>(&numScrips) + sizeof(numScrips));
//
// //write all scripts and their lengths
// buffer.insert(buffer.end(), scriptMsg.begin(), scriptMsg.end());
//
// //send initialization message
// queueMessage(buffer);
}
void ParallelConnection::listenCommunication(){
@@ -890,7 +1004,7 @@ namespace openspace {
writeHeader(buffer, MessageTypes::HostshipRequest);
//send message
queMessage(buffer);
queueMessage(buffer);
}
void ParallelConnection::setPassword(const std::string& pwd){
@@ -912,7 +1026,7 @@ namespace openspace {
buffer.insert(buffer.end(), script.begin(), script.end());
//send message
queMessage(buffer);
queueMessage(buffer);
}
bool ParallelConnection::initNetworkAPI(){
@@ -979,7 +1093,7 @@ namespace openspace {
buffer.insert(buffer.end(), tbuffer.begin(), tbuffer.end());
//send message
queMessage(buffer);
queueMessage(buffer);
}
else{
//if we're not the host and we have a valid keyframe (one that hasnt been used before)
@@ -1010,10 +1124,13 @@ namespace openspace {
void ParallelConnection::scriptMessage(const std::string propIdentifier, const std::string propValue){
//save script as current state
_currentState[propIdentifier] = propValue;
{
//mutex protect
std::lock_guard<std::mutex> lock(_currentStateMutex);
_currentState[propIdentifier] = propValue;
}
//if we're connected and we're the host, also send the script
if(_isConnected.load() && _isHost.load()){
//construct script
std::string script = scriptFromPropertyAndValue(propIdentifier, propValue);
@@ -1052,7 +1169,7 @@ namespace openspace {
buffer.insert(buffer.end(), sbuffer.begin(), sbuffer.end());
//send message
queMessage(buffer);
queueMessage(buffer);
}
}
@@ -1105,7 +1222,7 @@ namespace openspace {
buffer.insert(buffer.end(), kfBuffer.begin(), kfBuffer.end());
//send message
queMessage(buffer);
queueMessage(buffer);
//100 ms sleep - send keyframes 10 times per second
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@@ -1144,7 +1261,7 @@ namespace openspace {
writeHeader(buffer, MessageTypes::InitializationCompleted);
//queue script
queMessage(buffer);
queueMessage(buffer);
}
scripting::ScriptEngine::LuaLibrary ParallelConnection::luaLibrary() {