distinguish between local and remote scripting

This commit is contained in:
Emil Axelsson
2016-09-16 14:53:20 +02:00
parent 41fafcb5df
commit 13610b390d
24 changed files with 419 additions and 273 deletions
+133 -60
View File
@@ -84,14 +84,16 @@ ParallelConnection::ParallelConnection()
, _listenThread(nullptr)
, _handlerThread(nullptr)
, _isRunning(true)
, _isHost(false)
, _nConnections(0)
, _status(Status::Disconnected)
, _hostName("")
, _isConnected(false)
, _tryConnect(false)
, _disconnect(false)
, _latestTimeKeyframeValid(false)
, _initializationTimejumpRequired(false)
{
//create handler thread
_connectionEvent = std::make_shared<ghoul::Event<>>();
_handlerThread = std::make_unique<std::thread>(&ParallelConnection::threadManagement, this);
}
@@ -174,7 +176,7 @@ void ParallelConnection::disconnect(){
_isConnected.store(false);
//tell broadcast thread to stop broadcasting (we're no longer host)
_isHost.store(false);
setStatus(Status::Disconnected);
//join connection thread and delete it
if(_connectionThread != nullptr){
@@ -320,8 +322,14 @@ void ParallelConnection::establishConnection(addrinfo *info){
//we no longer need to try to establish connection
_tryConnect.store(false);
_sendBufferMutex.lock();
_sendBuffer.clear();
_sendBufferMutex.unlock();
//send authentication
sendAuthentication();
} else {
LINFO("Connection attempt failed.");
}
#ifdef WIN32
@@ -380,8 +388,11 @@ void ParallelConnection::handleMessage(const Message& message) {
case MessageType::Data:
dataMessageReceived(message.content);
break;
case MessageType::HostInformation:
hostInfoMessageReceived(message.content);
case MessageType::ConnectionStatus:
connectionStatusMessageReceived(message.content);
break;
case MessageType::NConnections:
nConnectionsMessageReceived(message.content);
break;
default:
//unknown message type
@@ -475,7 +486,7 @@ void ParallelConnection::dataMessageReceived(const std::vector<char>& messageCon
uint32_t type = *(reinterpret_cast<const uint32_t*>(messageContent.data()));
std::vector<char> buffer(messageContent.begin() + sizeof(uint32_t), messageContent.end());
switch(type){
switch(static_cast<network::datamessagestructures::Type>(type)) {
case network::datamessagestructures::Type::CameraData: {
network::datamessagestructures::CameraKeyframe kf(buffer);
OsEng.interactionHandler().addKeyframe(kf);
@@ -522,7 +533,7 @@ void ParallelConnection::dataMessageReceived(const std::vector<char>& messageCon
case network::datamessagestructures::Type::ScriptData: {
network::datamessagestructures::ScriptMessage sm;
sm.deserialize(buffer);
OsEng.scriptEngine().queueScript(sm._script);
OsEng.scriptEngine().queueScript(sm._script, scripting::ScriptEngine::RemoteScripting::No);
break;
}
@@ -562,8 +573,13 @@ void ParallelConnection::sendFunc(){
std::unique_lock<std::mutex> unqlock(_sendBufferMutex);
_sendCondition.wait(unqlock);
if (_disconnect) {
break;
}
while (!_sendBuffer.empty()) {
const Message& message = _sendBuffer.front();
Message message = _sendBuffer.front();
unqlock.unlock();
std::vector<char> header;
//insert header into buffer
@@ -594,65 +610,79 @@ void ParallelConnection::sendFunc(){
signalDisconnect();
}
unqlock.lock();
_sendBuffer.erase(_sendBuffer.begin());
}
}
}
std::lock_guard<std::mutex> sendLock(_sendBufferMutex);
_sendBuffer.clear();
}
void ParallelConnection::hostInfoMessageReceived(const std::vector<char>& message){
if (message.size() < 1) {
LERROR("Malformed host info message.");
void ParallelConnection::connectionStatusMessageReceived(const std::vector<char>& message) {
if (message.size() < 2 * sizeof(uint32_t)) {
LERROR("Malformed connection status message.");
return;
}
char hostInfo = message[0];
size_t pointer = 0;
uint32_t statusIn = *(reinterpret_cast<const uint32_t*>(&message[pointer]));
network::Status status = static_cast<network::Status>(statusIn);
pointer += sizeof(uint32_t);
size_t hostNameSize = *(reinterpret_cast<const uint32_t*>(&message[pointer]));
pointer += sizeof(uint32_t);
if (hostNameSize > message.size() - pointer) {
LERROR("Malformed connection status message.");
return;
}
std::string hostName = "";
if (hostNameSize > 0) {
hostName = std::string(&message[pointer], hostNameSize);
}
pointer += hostNameSize;
if (hostInfo == 1) { // assigned as host
if (_isHost.load()) {
//we're already host, do nothing (dummy check)
return;
}
_isHost.store(true);
if (status > Status::Host) {
LERROR("Invalid status.");
return;
}
//start broadcasting
if (status == _status) {
//status remains unchanged.
return;
}
setStatus(status);
setHostName(hostName);
if (status == Status::Host) { // assigned as host
_broadcastThread = std::make_unique<std::thread>(&ParallelConnection::broadcast, this);
} else { // assigned as client
if (!_isHost.load()) {
//we're already a client, do nothing (dummy check)
return;
}
//stop broadcast loop
_isHost.store(false);
// delete broadcasting thread
// (the thread is allowed to terminate once the status is set to non-host.)
if (_broadcastThread != nullptr) {
_broadcastThread->join();
_broadcastThread = nullptr;
}
OsEng.interactionHandler().clearKeyframes();
//clear buffered any keyframes
//request init package from the host
//int size = headerSize();
//std::vector<char> buffer;
//buffer.reserve(size);
//write header
//writeHeader(buffer, MessageTypes::InitializationRequest);
//send message
//std::cout << "host info queue" << std::endl;
//queueMessage(MessageTypes::InitializationRequest, buffer);
}
}
void ParallelConnection::nConnectionsMessageReceived(const std::vector<char>& message) {
if (message.size() < sizeof(uint32_t)) {
LERROR("Malformed host info message.");
return;
}
uint32_t nConnections = *(reinterpret_cast<const uint32_t*>(&message[0]));
setNConnections(nConnections);
}
//void ParallelConnection::initializationRequestMessageReceived(const std::vector<char>& message){
/*
//get current state as scripts
@@ -742,8 +772,10 @@ void ParallelConnection::listenCommunication() {
//if enough data was received
if (nBytesRead <= 0) {
LERROR("Error " << _ERRNO << " detected in connection when reading header, disconnecting!");
signalDisconnect();
if (!_disconnect) {
LERROR("Error " << _ERRNO << " detected in connection when reading header, disconnecting!");
signalDisconnect();
}
break;
}
@@ -773,8 +805,10 @@ void ParallelConnection::listenCommunication() {
nBytesRead = receiveData(_clientSocket, messageBuffer, messageSize, 0);
if (nBytesRead <= 0) {
LERROR("Error " << _ERRNO << " detected in connection when reading message, disconnecting!");
signalDisconnect();
if (!_disconnect) {
LERROR("Error " << _ERRNO << " detected in connection when reading message, disconnecting!");
signalDisconnect();
}
break;
}
@@ -816,10 +850,7 @@ void ParallelConnection::setName(const std::string& name){
_name = name;
}
bool ParallelConnection::isHost(){
return _isHost.load();
}
void ParallelConnection::requestHostship(const std::string &password){
std::vector<char> buffer;
uint32_t passcode = hash(password);
@@ -863,7 +894,7 @@ bool ParallelConnection::initNetworkAPI(){
}
void ParallelConnection::sendScript(const std::string& script) {
if (!_isHost) return;
if (!isHost()) return;
network::datamessagestructures::ScriptMessage sm;
sm._script = script;
@@ -886,7 +917,7 @@ void ParallelConnection::preSynchronization(){
//std::cout << "start." << std::endl;
//if we're the host
if(_isHost){
//if(_isHost){
/*
//get current time parameters and create a keyframe
network::datamessagestructures::TimeKeyframe tf;
@@ -925,8 +956,8 @@ void ParallelConnection::preSynchronization(){
std::cout << "host 7." << std::endl;
*/
}
else{
//}
//else{
/*
//if we're not the host and we have a valid keyframe (one that hasnt been used before)
if(_latestTimeKeyframeValid.load()){
@@ -951,7 +982,7 @@ void ParallelConnection::preSynchronization(){
}
*/
}
//}
//std::cout << "stop." << std::endl;
}
@@ -1012,10 +1043,47 @@ void ParallelConnection::preSynchronization(){
//}
void ParallelConnection::setStatus(Status status) {
if (_status != status) {
_status = status;
_connectionEvent->publish("statusChanged");
}
}
Status ParallelConnection::status() {
return _status;
}
void ParallelConnection::setNConnections(size_t nConnections) {
if (_nConnections != nConnections) {
_nConnections = nConnections;
_connectionEvent->publish("nConnectionsChanged");
}
}
size_t ParallelConnection::nConnections() {
return _nConnections;
}
bool ParallelConnection::isHost() {
return _status == Status::Host;
}
void ParallelConnection::setHostName(const std::string& hostName) {
if (_hostName != hostName) {
_hostName = hostName;
_connectionEvent->publish("hostNameChanged");
}
}
const std::string& ParallelConnection::hostName() {
return _hostName;
}
void ParallelConnection::broadcast(){
//while we're still connected and we're the host
while (_isConnected && _isHost) {
while (_isConnected && isHost()) {
//create a keyframe with current position and orientation of camera
network::datamessagestructures::CameraKeyframe kf;
kf._position = OsEng.interactionHandler().camera()->positionVec3();
@@ -1054,6 +1122,11 @@ uint32_t ParallelConnection::hash(const std::string &val) {
return hashVal;
};
std::shared_ptr<ghoul::Event<>> ParallelConnection::connectionEvent() {
return _connectionEvent;
}
scripting::LuaLibrary ParallelConnection::luaLibrary() {
return {