Merge branch 'master' of github.com:OpenSpace/OpenSpace into feature/data-management

This commit is contained in:
Emil Axelsson
2017-11-10 16:07:58 +01:00
482 changed files with 7286 additions and 3720 deletions
+4 -4
View File
@@ -49,7 +49,7 @@ namespace {
namespace openspace {
NetworkEngine::NetworkEngine()
NetworkEngine::NetworkEngine()
// -1 is okay as we assign one identifier in this ctor
: _lastAssignedIdentifier(MessageIdentifier(-1))
, _shouldPublishStatusMessage(true)
@@ -104,7 +104,7 @@ void NetworkEngine::publishStatusMessage() {
Time& currentTime = OsEng.timeManager().time();
uint16_t messageSize = 0;
double time = currentTime.j2000Seconds();
std::string timeString = currentTime.UTC();
double delta = currentTime.deltaTime();
@@ -117,7 +117,7 @@ void NetworkEngine::publishStatusMessage() {
unsigned int currentLocation = 0;
std::vector<char> buffer(messageSize);
std::memmove(buffer.data() + currentLocation, &time, sizeof(time));
currentLocation += sizeof(time);
std::memmove(
@@ -217,7 +217,7 @@ void NetworkEngine::sendInitialInformation() {
std::vector<char> payload = m.body;
payload.insert(payload.begin(), identifier.data.begin(), identifier.data.end());
OsEng.windowWrapper().sendMessageToExternalControl(payload);
LINFO("Sent initial message: (s=" << m.body.size() << ")" <<
LINFO("Sent initial message: (s=" << m.body.size() << ")" <<
"[i=" << identifier.value << "]"
);
+107 -100
View File
@@ -35,23 +35,30 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <errno.h>
#ifndef SOCKET_ERROR
#define SOCKET_ERROR (-1)
#endif
#ifndef INVALID_SOCKET
#define INVALID_SOCKET (_SOCKET)(~0)
#define INVALID_SOCKET static_cast<_SOCKET>(~0)
#endif
#ifndef NO_ERROR
#define NO_ERROR 0L
#endif
// #ifndef NO_ERROR
// #define NO_ERROR 0L
// #endif
#ifndef _ERRNO
#define _ERRNO errno
#endif
#endif
#ifdef WIN32
using SocketResultType = int;
#else
using SocketResultType = size_t;
#endif // WIN32
#ifdef WIN32
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
@@ -177,11 +184,11 @@ ParallelConnection::ParallelConnection()
, _handlerThread(nullptr)
{
addProperty(_name);
addProperty(_port);
addProperty(_address);
addProperty(_bufferTime);
addProperty(_password);
addProperty(_hostPassword);
@@ -195,22 +202,22 @@ ParallelConnection::ParallelConnection()
this
);
}
ParallelConnection::~ParallelConnection() {
// signal that a disconnect should occur
signalDisconnect();
// signal that execution has stopped
_isRunning.store(false);
// join handler
_handlerThread->join();
}
void ParallelConnection::threadManagement() {
// The _disconnectCondition.wait(unqlock) stalls
// How about moving this out of the thread and into the destructor? ---abock
// while we're still running
while(_isRunning){
// lock disconnect mutex mutex
@@ -221,32 +228,32 @@ void ParallelConnection::threadManagement() {
disconnectLock,
[this]() { return _disconnect.load(); }
);
// perform actual disconnect
disconnect();
}
}
void ParallelConnection::signalDisconnect() {
//signal handler thread to disconnect
_disconnect = true;
_sendCondition.notify_all(); // Allow send function to terminate.
_disconnectCondition.notify_all(); // Unblock thread management thread.
}
void ParallelConnection::closeSocket() {
/*
Windows shutdown options
* SD_RECIEVE
* SD_SEND
* SD_BOTH
Linux & Mac shutdown options
* SHUT_RD (Disables further receive operations)
* SHUT_WR (Disables further send operations)
* SHUT_RDWR (Disables further send and receive operations)
*/
#ifdef WIN32
shutdown(_clientSocket, SD_BOTH);
closesocket(_clientSocket);
@@ -254,37 +261,37 @@ void ParallelConnection::closeSocket() {
shutdown(_clientSocket, SHUT_RDWR);
close(_clientSocket);
#endif
_clientSocket = INVALID_SOCKET;
}
void ParallelConnection::disconnect() {
// We're disconnecting
if (_clientSocket != INVALID_SOCKET) {
// Must be run before trying to join communication threads, else the threads are
// stuck trying to receive data
closeSocket();
// Ttell connection thread to stop trying to connect
_tryConnect = false;
// Tell send thread to stop sending and listen thread to stop listenin
_isConnected = false;
setStatus(Status::Disconnected);
// join connection thread and delete it
if (_connectionThread != nullptr) {
_connectionThread->join();
_connectionThread = nullptr;
}
// join send thread and delete it
if (_sendThread != nullptr) {
_sendThread->join();
_sendThread = nullptr;
}
// join listen thread and delete it
if (_listenThread != nullptr) {
_listenThread->join();
@@ -295,23 +302,23 @@ void ParallelConnection::disconnect() {
_disconnect = false;
}
}
void ParallelConnection::clientConnect() {
// We're already connected (or already trying to connect), do nothing (dummy check)
if (_isConnected.load() || _tryConnect.load()) {
return;
}
if (!initNetworkAPI()) {
LERROR("Failed to initialize network API for Parallel Connection");
return;
}
struct addrinfo* addresult = NULL;
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
@@ -328,13 +335,13 @@ void ParallelConnection::clientConnect() {
LERROR("Failed to parse hints for Parallel Connection");
return;
}
// We're not connected
_isConnected = false;
// We want to try and establish a connection
_tryConnect = true;
// Start connection thread
_connectionThread = std::make_unique<std::thread>(
&ParallelConnection::establishConnection,
@@ -345,7 +352,7 @@ void ParallelConnection::clientConnect() {
void ParallelConnection::establishConnection(addrinfo *info) {
_clientSocket = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
if (_clientSocket == INVALID_SOCKET) {
freeaddrinfo(info);
LERROR("Failed to create client socket, disconnecting.");
@@ -353,11 +360,11 @@ void ParallelConnection::establishConnection(addrinfo *info) {
// Signal a disconnect
signalDisconnect();
}
int trueFlag = 1;
int falseFlag = 0;
int result;
// Set no delay
result = setsockopt(
_clientSocket, // socket affected
@@ -366,7 +373,7 @@ void ParallelConnection::establishConnection(addrinfo *info) {
reinterpret_cast<char*>(&trueFlag), // the cast is historical cruft
sizeof(int) // length of option value
);
// Set send timeout
int timeout = 0;
result = setsockopt(
@@ -376,7 +383,7 @@ void ParallelConnection::establishConnection(addrinfo *info) {
reinterpret_cast<char*>(&timeout),
sizeof(timeout)
);
// Set receive timeout
result = setsockopt(
_clientSocket,
@@ -396,7 +403,7 @@ void ParallelConnection::establishConnection(addrinfo *info) {
if (result == SOCKET_ERROR) {
LERROR("Failed to set socket option 'reuse address'. Error code: " << _ERRNO);
}
result = setsockopt(
_clientSocket,
SOL_SOCKET,
@@ -407,31 +414,31 @@ void ParallelConnection::establishConnection(addrinfo *info) {
if (result == SOCKET_ERROR) {
LERROR("Failed to set socket option 'keep alive'. Error code: " << _ERRNO);
}
LINFO("Attempting to connect to server "<< _address << " on port " << _port);
// Try to connect
result = connect(_clientSocket, info->ai_addr, static_cast<int>(info->ai_addrlen));
// If the connection was successfull
if (result != SOCKET_ERROR) {
LINFO("Connection established with server at ip: "<< _address);
// We're connected
_isConnected = true;
// Start sending messages
_sendThread = std::make_unique<std::thread>(&ParallelConnection::sendFunc, this);
// Start listening for communication
_listenThread = std::make_unique<std::thread>(
&ParallelConnection::listenCommunication,
this
);
// We no longer need to try to establish connection
_tryConnect = false;
_sendBufferMutex.lock();
_sendBuffer.clear();
_sendBufferMutex.unlock();
@@ -442,7 +449,7 @@ void ParallelConnection::establishConnection(addrinfo *info) {
LINFO("Connection attempt failed.");
signalDisconnect();
}
// Cleanup
freeaddrinfo(info);
}
@@ -451,7 +458,7 @@ void ParallelConnection::sendAuthentication() {
std::string name = _name.value();
// Length of this nodes name
uint32_t nameLength = static_cast<uint32_t>(name.length());
// Total size of the buffer: (passcode + namelength + name)
size_t size = 2 * sizeof(uint32_t) + nameLength;
@@ -477,7 +484,7 @@ void ParallelConnection::sendAuthentication() {
// Write this node's name to buffer
buffer.insert(buffer.end(), name.begin(), name.end());
// Send buffer
queueOutMessage(Message(MessageType::Authentication, buffer));
}
@@ -506,12 +513,12 @@ void ParallelConnection::handleMessage(const Message& message) {
/*
void ParallelConnection::initializationMessageReceived(){
int result;
uint32_t id, datasize;
uint16_t numscripts;
std::vector<char> buffer;
buffer.resize(sizeof(id));
@@ -528,25 +535,25 @@ void ParallelConnection::initializationMessageReceived(){
//error
}
datasize = *(reinterpret_cast<uint32_t*>(buffer.data()));
buffer.clear();
buffer.resize(sizeof(uint16_t));
//read number of scripts
result = receiveData(_clientSocket, buffer, sizeof(numscripts), 0);
if(result < 0){
//error
}
}
numscripts = *(reinterpret_cast<uint16_t*>(buffer.data()));
//length of current script
uint16_t scriptlen;
buffer.clear();
buffer.resize(sizeof(scriptlen));
//holder for current script
std::string script;
for(int n = 0; n < numscripts; ++n){
//read length of script
result = receiveData(_clientSocket, buffer, sizeof(numscripts), 0);
@@ -554,33 +561,33 @@ void ParallelConnection::initializationMessageReceived(){
//error
}
scriptlen = *(reinterpret_cast<uint16_t*>(buffer.data()));
//resize buffer
buffer.clear();
buffer.resize(scriptlen);
//read script
result = receiveData(_clientSocket, buffer, scriptlen, 0);
//assign current script
script.clear();
script.assign(buffer.begin(), buffer.end());
//queue received script
OsEng.scriptEngine().queueScript(script);
}
//we've gone through all scripts, initialization is done
buffer.clear();
writeHeader(buffer, MessageTypes::InitializationCompleted);
//let the server know
std::cout << "initialization message recieved queue" << std::endl;
queueMessage(InitializationCompleted, buffer);
//we also need to force a time jump just to ensure that the server and client are synced
_initializationTimejumpRequired.store(true);
}
*/
@@ -621,12 +628,12 @@ double ParallelConnection::timeTolerance() const {
void ParallelConnection::dataMessageReceived(const std::vector<char>& messageContent) {
// The type of data message received
uint32_t type = *(reinterpret_cast<const uint32_t*>(messageContent.data()));
uint32_t type = *(reinterpret_cast<const uint32_t*>(messageContent.data()));
std::vector<char> buffer(
messageContent.begin() + sizeof(uint32_t),
messageContent.end()
);
switch (static_cast<datamessagestructures::Type>(type)) {
case datamessagestructures::Type::CameraData: {
datamessagestructures::CameraKeyframe kf(buffer);
@@ -664,7 +671,7 @@ void ParallelConnection::dataMessageReceived(const std::vector<char>& messageCon
OsEng.scriptEngine().queueScript(
sm._script,
scripting::ScriptEngine::RemoteScripting::No
);
);
break;
}
default: {
@@ -696,9 +703,8 @@ void ParallelConnection::queueOutMessage(const Message& message) {
_sendBuffer.push_back(message);
_sendCondition.notify_all();
}
void ParallelConnection::sendFunc(){
int result;
void ParallelConnection::sendFunc() {
// While we're connected
while (_isConnected && !_disconnect) {
// Wait for signal then lock mutex and send first queued message
@@ -737,7 +743,9 @@ void ParallelConnection::sendFunc(){
reinterpret_cast<const char*>(&messageSizeOut) + sizeof(uint32_t)
);
result = send(
// result is most likely size_t, but send might return a different type on
// different platforms
auto result = send(
_clientSocket,
header.data(),
static_cast<int>(header.size()),
@@ -766,7 +774,7 @@ void ParallelConnection::sendFunc(){
_sendBuffer.clear();
}
void ParallelConnection::connectionStatusMessageReceived(const std::vector<char>& message)
{
if (message.size() < 2 * sizeof(uint32_t)) {
@@ -791,7 +799,7 @@ void ParallelConnection::connectionStatusMessageReceived(const std::vector<char>
hostName = std::string(&message[pointer], hostNameSize);
}
pointer += hostNameSize;
if (status > Status::Host) {
LERROR("Invalid status.");
return;
@@ -827,34 +835,34 @@ void ParallelConnection::nConnectionsMessageReceived(const std::vector<char>& me
//void ParallelConnection::initializationRequestMessageReceived(const std::vector<char>& message){
/*
/*
//get current state as scripts
std::vector<std::string> scripts;
std::map<std::string, std::string>::iterator state_it;
{
//mutex protect
std::lock_guard<std::mutex> lock(_currentStateMutex);
for(state_it = _currentState.begin();
state_it != _currentState.end();
++state_it){
scripts.push_back(scriptFromPropertyAndValue(state_it->first, state_it->second));
}
}
//get requester ID
std::vector<char> buffer;
buffer.resize(sizeof(uint32_t));
receiveData(_clientSocket, buffer, sizeof(uint32_t), 0);
uint32_t requesterID = *reinterpret_cast<uint32_t*>(buffer.data());
//total number of scripts sent
uint16_t numscripts = 0;
//temporary buffers
std::vector<char> scriptbuffer;
std::vector<char> tmpbuffer;
//serialize and encode all scripts into scriptbuffer
std::vector<std::string>::iterator script_it;
datamessagestructures::ScriptMessage sm;
@@ -867,31 +875,31 @@ void ParallelConnection::nConnectionsMessageReceived(const std::vector<char>& me
//serialize current script
tmpbuffer.clear();
sm.serialize(tmpbuffer);
//and insert into full buffer
scriptbuffer.insert(scriptbuffer.end(), tmpbuffer.begin(), tmpbuffer.end());
//increment number of scripts
numscripts++;
}
//write header
buffer.clear();
writeHeader(buffer, MessageTypes::Initialization);
//write client ID to receive init message
buffer.insert(buffer.end(), reinterpret_cast<char*>(&requesterID), reinterpret_cast<char*>(&requesterID) + sizeof(uint32_t));
//write total size of data chunk
uint32_t totlen = static_cast<uint32_t>(scriptbuffer.size());
buffer.insert(buffer.end(), reinterpret_cast<char*>(&totlen), reinterpret_cast<char*>(&totlen) + sizeof(uint32_t));
//write number of scripts
buffer.insert(buffer.end(), reinterpret_cast<char*>(&numscripts), reinterpret_cast<char*>(&numscripts) + sizeof(uint16_t));
//write all scripts
buffer.insert(buffer.end(), scriptbuffer.begin(), scriptbuffer.end());
//queue message
std::cout << "initializationRequest queue" << std::endl;
queueMessage(MessageType::Initialization, buffer);
@@ -906,7 +914,7 @@ void ParallelConnection::listenCommunication() {
std::vector<char> messageBuffer;
int nBytesRead = 0;
// While we're still connected
while (_isConnected.load()) {
// Receive the header data
@@ -976,8 +984,8 @@ void ParallelConnection::listenCommunication() {
int ParallelConnection::receiveData(_SOCKET& socket, std::vector<char>& buffer,
int length, int flags)
{
int result = 0;
int received = 0;
SocketResultType result = 0;
SocketResultType received = 0;
while (result < length) {
received = recv(socket, buffer.data() + result, length - result, flags);
@@ -992,9 +1000,9 @@ int ParallelConnection::receiveData(_SOCKET& socket, std::vector<char>& buffer,
}
}
return result;
return static_cast<int>(result);
}
void ParallelConnection::setPort(std::string port){
_port = std::move(port);
}
@@ -1002,12 +1010,11 @@ void ParallelConnection::setPort(std::string port){
void ParallelConnection::setAddress(std::string address){
_address = std::move(address);
}
void ParallelConnection::setName(std::string name){
_name = std::move(name);
}
void ParallelConnection::requestHostship(){
std::vector<char> buffer;
uint32_t passcode = hash(_hostPassword);
@@ -1058,7 +1065,7 @@ void ParallelConnection::sendScript(std::string script) {
datamessagestructures::ScriptMessage sm;
sm._script = std::move(script);
std::vector<char> buffer;
sm.serialize(buffer);
@@ -1079,7 +1086,7 @@ void ParallelConnection::preSynchronization() {
handleMessage(message);
_receiveBuffer.pop_front();
}
if (status() == Status::Host) {
if (OsEng.timeManager().time().timeJumped()) {
_timeJumped = true;
@@ -1096,7 +1103,7 @@ void ParallelConnection::preSynchronization() {
}
}
}
void ParallelConnection::setStatus(Status status) {
if (_status != status) {
_status = status;
@@ -1171,7 +1178,7 @@ void ParallelConnection::sendCameraKeyframe() {
void ParallelConnection::sendTimeKeyframe() {
// Create a keyframe with current position and orientation of camera
datamessagestructures::TimeKeyframe kf;
Time& time = OsEng.timeManager().time();
kf._dt = time.deltaTime();
@@ -1214,7 +1221,7 @@ uint32_t ParallelConnection::hash(const std::string& val) {
std::shared_ptr<ghoul::Event<>> ParallelConnection::connectionEvent() {
return _connectionEvent;
}
scripting::LuaLibrary ParallelConnection::luaLibrary() {
return {
"parallel",