Feature/documentation topic (#822)

- Implement documentation topic that can be used to query documentation using the network API.
- Implement a way to pass arguments to lua scripts using json (rather than formatting entire lua string clientside)
- Implement ability to attach callback to lua script executions
- Implement abillity to transport return values from lua scripts back to network API clients.
- Do not initialize server interface on slave nodes.
- Implement Dictionary -> json converter using nlohmann json library
This commit is contained in:
Emil Axelsson
2019-04-03 10:30:28 +02:00
committed by GitHub
parent 4ef0bdc0a5
commit 53e07d90e1
30 changed files with 414 additions and 550 deletions
-2
View File
@@ -63,7 +63,6 @@ set(OPENSPACE_SOURCE
${OPENSPACE_BASE_DIR}/src/mission/mission.cpp
${OPENSPACE_BASE_DIR}/src/mission/missionmanager.cpp
${OPENSPACE_BASE_DIR}/src/mission/missionmanager_lua.inl
${OPENSPACE_BASE_DIR}/src/network/networkengine.cpp
${OPENSPACE_BASE_DIR}/src/network/parallelconnection.cpp
${OPENSPACE_BASE_DIR}/src/network/parallelpeer.cpp
${OPENSPACE_BASE_DIR}/src/network/parallelpeer_lua.inl
@@ -245,7 +244,6 @@ set(OPENSPACE_HEADER
${OPENSPACE_BASE_DIR}/include/openspace/interaction/shortcutmanager.h
${OPENSPACE_BASE_DIR}/include/openspace/mission/mission.h
${OPENSPACE_BASE_DIR}/include/openspace/mission/missionmanager.h
${OPENSPACE_BASE_DIR}/include/openspace/network/networkengine.h
${OPENSPACE_BASE_DIR}/include/openspace/network/parallelconnection.h
${OPENSPACE_BASE_DIR}/include/openspace/network/parallelpeer.h
${OPENSPACE_BASE_DIR}/include/openspace/network/parallelserver.h
-6
View File
@@ -37,7 +37,6 @@
#include <openspace/interaction/sessionrecording.h>
#include <openspace/interaction/shortcutmanager.h>
#include <openspace/mission/missionmanager.h>
#include <openspace/network/networkengine.h>
#include <openspace/network/parallelpeer.h>
#include <openspace/performance/performancemanager.h>
#include <openspace/properties/propertyowner.h>
@@ -94,11 +93,6 @@ ModuleEngine& gModuleEngine() {
return g;
}
NetworkEngine& gNetworkEngine() {
static NetworkEngine g;
return g;
}
OpenSpaceEngine& gOpenSpaceEngine() {
static OpenSpaceEngine g;
return g;
-4
View File
@@ -40,7 +40,6 @@
#include <openspace/interaction/sessionrecording.h>
#include <openspace/interaction/navigationhandler.h>
#include <openspace/interaction/orbitalnavigator.h>
#include <openspace/network/networkengine.h>
#include <openspace/network/parallelpeer.h>
#include <openspace/performance/performancemeasurement.h>
#include <openspace/performance/performancemanager.h>
@@ -1246,9 +1245,6 @@ void OpenSpaceEngine::mouseScrollWheelCallback(double posX, double posY) {
std::vector<char> OpenSpaceEngine::encode() {
std::vector<char> buffer = global::syncEngine.encodeSyncables();
global::networkEngine.publishStatusMessage();
global::networkEngine.sendMessages();
return buffer;
}
+4 -12
View File
@@ -36,7 +36,7 @@ SyncEngine::SyncEngine(unsigned int syncBufferSize)
ghoul_assert(syncBufferSize > 0, "syncBufferSize must be bigger than 0");
}
// should be called on sgct master
// Should be called on sgct master
std::vector<char> SyncEngine::encodeSyncables() {
for (Syncable* syncable : _syncables) {
syncable->encode(&_syncBuffer);
@@ -45,17 +45,9 @@ std::vector<char> SyncEngine::encodeSyncables() {
std::vector<char> data = _syncBuffer.data();
_syncBuffer.reset();
return data;
//_dataStream.resize(_encodeOffset);
//_synchronizationBuffer->setVal(_dataStream);
//sgct::SharedData::instance()->writeVector(_synchronizationBuffer.get());
//_dataStream.resize(_n);
//_encodeOffset = 0;
//_decodeOffset = 0;
//_syncBuffer.write();
}
//should be called on sgct slaves
// Should be called on sgct slaves
void SyncEngine::decodeSyncables(std::vector<char> data) {
_syncBuffer.setData(std::move(data));
for (Syncable* syncable : _syncables) {
@@ -78,14 +70,14 @@ void SyncEngine::postSynchronization(IsMaster isMaster) {
}
void SyncEngine::addSyncable(Syncable* syncable) {
ghoul_assert(syncable, "synable must not be nullptr");
ghoul_assert(syncable, "Syncable must not be nullptr");
_syncables.push_back(syncable);
}
void SyncEngine::addSyncables(const std::vector<Syncable*>& syncables) {
for (Syncable* syncable : syncables) {
ghoul_assert(syncable, "syncables must not contain any nullptr");
ghoul_assert(syncable, "Syncables must not contain any nullptr");
addSyncable(syncable);
}
}
-238
View File
@@ -1,238 +0,0 @@
/*****************************************************************************************
* *
* OpenSpace *
* *
* Copyright (c) 2014-2019 *
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy of this *
* software and associated documentation files (the "Software"), to deal in the Software *
* without restriction, including without limitation the rights to use, copy, modify, *
* merge, publish, distribute, sublicense, and/or sell copies of the Software, and to *
* permit persons to whom the Software is furnished to do so, subject to the following *
* conditions: *
* *
* The above copyright notice and this permission notice shall be included in all copies *
* or substantial portions of the Software. *
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, *
* INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A *
* PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT *
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF *
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE *
* OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *
****************************************************************************************/
#include <openspace/network/networkengine.h>
#include <openspace/engine/globals.h>
#include <openspace/engine/windowdelegate.h>
#include <openspace/scripting/scriptengine.h>
#include <openspace/util/timemanager.h>
#include <ghoul/logging/logmanager.h>
#include <array>
#include <chrono>
#include <thread>
namespace {
constexpr const char* _loggerCat = "NetworkEngine";
constexpr const char* StatusMessageIdentifierName = "StatusMessage";
constexpr const char* MappingIdentifierIdentifierName = "IdentifierMapping";
constexpr const char* InitialMessageFinishedIdentifierName = "InitialMessageFinished";
constexpr const char MessageTypeLuaScript = '0';
constexpr const char MessageTypeExternalControlConnected = '1';
} // namespace
namespace openspace {
NetworkEngine::NetworkEngine() {
static_assert(
sizeof(MessageIdentifier) == 2,
"MessageIdentifier has to be 2 bytes or dependent applications will break"
);
_statusMessageIdentifier = identifier(StatusMessageIdentifierName);
_identifierMappingIdentifier = identifier(MappingIdentifierIdentifierName);
_initialMessageFinishedIdentifier = identifier(InitialMessageFinishedIdentifierName);
}
bool NetworkEngine::handleMessage(const std::string& message) {
// The first byte determines the type of message
const char type = message[0];
switch (type) {
case MessageTypeLuaScript: // LuaScript
global::scriptEngine.queueScript(
message.substr(1),
scripting::ScriptEngine::RemoteScripting::No
);
return true;
case MessageTypeExternalControlConnected:
publishIdentifierMappingMessage();
std::this_thread::sleep_for(std::chrono::milliseconds(250));
sendInitialInformation();
return true;
default:
LERROR(fmt::format("Unknown type '{}'", type));
return false;
}
}
void NetworkEngine::publishStatusMessage() {
if (!_shouldPublishStatusMessage ||
!global::windowDelegate.isExternalControlConnected())
{
return;
}
// Protocol:
// 8 bytes: time as a ET double
// 24 bytes: time as a UTC string
// 8 bytes: delta time as double
// Total: 40
const Time& currentTime = global::timeManager.time();
uint16_t messageSize = 0;
const double time = currentTime.j2000Seconds();
const std::string timeString = currentTime.UTC();
double delta = global::timeManager.deltaTime();
messageSize += sizeof(time);
messageSize += static_cast<uint16_t>(timeString.length());
messageSize += sizeof(delta);
ghoul_assert(messageSize == 40, "Message size is not correct");
unsigned int currentLocation = 0;
std::vector<char> buffer(messageSize);
std::memmove(buffer.data() + currentLocation, &time, sizeof(time));
currentLocation += sizeof(time);
std::memmove(
buffer.data() + currentLocation,
timeString.c_str(), timeString.length()
);
currentLocation += static_cast<unsigned int>(timeString.length());
std::memmove(buffer.data() + currentLocation, &delta, sizeof(delta));
publishMessage(_statusMessageIdentifier, std::move(buffer));
}
void NetworkEngine::publishIdentifierMappingMessage() {
size_t bufferSize = sizeof(uint16_t);
for (const std::pair<std::string, MessageIdentifier>& i : _identifiers) {
bufferSize += sizeof(MessageIdentifier);
bufferSize += i.first.size() + 1; // +1 for \0 terminating character
}
std::vector<char> buffer(bufferSize);
size_t currentWritingPosition = 0;
uint16_t size = static_cast<uint16_t>(_identifiers.size());
std::memcpy(buffer.data(), &size, sizeof(uint16_t));
currentWritingPosition += sizeof(uint16_t);
for (const std::pair<std::string, MessageIdentifier>& i : _identifiers) {
std::memcpy(
buffer.data() + currentWritingPosition,
&(i.second), sizeof(MessageIdentifier)
);
currentWritingPosition += sizeof(MessageIdentifier);
uint8_t stringSize = static_cast<uint8_t>(i.first.size());
std::memcpy(buffer.data() + currentWritingPosition, &stringSize, sizeof(uint8_t));
currentWritingPosition += sizeof(uint8_t);
std::memcpy(buffer.data() + currentWritingPosition, i.first.data(), stringSize);
currentWritingPosition += i.first.size();
}
publishMessage(_identifierMappingIdentifier, std::move(buffer));
}
NetworkEngine::MessageIdentifier NetworkEngine::identifier(std::string name) {
auto i = _identifiers.find(name);
if (i != _identifiers.end()) {
return i->second;
}
else {
_lastAssignedIdentifier++;
MessageIdentifier result = _lastAssignedIdentifier;
_identifiers[std::move(name)] = result;
return result;
}
}
void NetworkEngine::publishMessage(MessageIdentifier identifier,
std::vector<char> message)
{
_messagesToSend.push_back({ std::move(identifier), std::move(message) });
}
void NetworkEngine::sendMessages() {
if (!global::windowDelegate.isExternalControlConnected()) {
return;
}
for (Message& m : _messagesToSend) {
// Protocol:
// 2 bytes: type of message as uint16_t
// Rest of payload depending on the message type
union {
MessageIdentifier value;
std::array<char, 2> data;
} identifier = {};
identifier.value = m.identifer;
// Prepending the message identifier to the front
m.body.insert(m.body.begin(), identifier.data.begin(), identifier.data.end());
global::windowDelegate.sendMessageToExternalControl(m.body);
}
_messagesToSend.clear();
}
void NetworkEngine::sendInitialInformation() {
constexpr const int SleepTime = 250;
_shouldPublishStatusMessage = false;
for (const Message& m : _initialConnectionMessages) {
union {
MessageIdentifier value;
std::array<char, 2> data;
} identifier = {};
identifier.value = m.identifer;
std::vector<char> payload = m.body;
payload.insert(payload.begin(), identifier.data.begin(), identifier.data.end());
global::windowDelegate.sendMessageToExternalControl(payload);
LINFO(fmt::format(
"Sent initial message: (s={}) [i={}]", m.body.size(), identifier.value
));
std::this_thread::sleep_for(std::chrono::milliseconds(SleepTime));
}
std::this_thread::sleep_for(std::chrono::milliseconds(4 * SleepTime));
// Send finished message
union {
MessageIdentifier value;
std::array<char, 2> data;
} identifier = {};
identifier.value = _initialMessageFinishedIdentifier;
std::vector<char> d;
d.insert(d.begin(), identifier.data.begin(), identifier.data.end());
global::windowDelegate.sendMessageToExternalControl(d);
_shouldPublishStatusMessage = true;
}
void NetworkEngine::setInitialConnectionMessage(MessageIdentifier identifier,
std::vector<char> message)
{
// Add check if a MessageIdentifier already exists ---abock
_initialConnectionMessages.push_back({std::move(identifier), std::move(message)});
}
} // namespace openspace
+62 -42
View File
@@ -161,7 +161,7 @@ bool ScriptEngine::hasLibrary(const std::string& name) {
return (it != _registeredLibraries.end());
}
bool ScriptEngine::runScript(const std::string& script) {
bool ScriptEngine::runScript(const std::string& script, ScriptCallback callback) {
if (script.empty()) {
LWARNING("Script was empty");
return false;
@@ -173,7 +173,13 @@ bool ScriptEngine::runScript(const std::string& script) {
}
try {
ghoul::lua::runScript(_state, script);
if (callback) {
ghoul::Dictionary returnValue =
ghoul::lua::loadArrayDictionaryFromString(script, _state);
callback.value()(returnValue);
} else {
ghoul::lua::runScript(_state, script);
}
}
catch (const ghoul::lua::LuaLoadingException& e) {
LERRORC(e.component, e.message);
@@ -183,6 +189,10 @@ bool ScriptEngine::runScript(const std::string& script) {
LERRORC(e.component, e.message);
return false;
}
catch (const ghoul::RuntimeError& e) {
LERRORC(e.component, e.message);
return false;
}
return true;
}
@@ -639,72 +649,82 @@ void ScriptEngine::preSync(bool isMaster) {
return;
}
_mutex.lock();
if (!_queuedScripts.empty()) {
_currentSyncedScript = _queuedScripts.back().first;
const bool remoteScripting = _queuedScripts.back().second;
std::lock_guard<std::mutex> guard(_slaveScriptsMutex);
while (!_incomingScripts.empty()) {
QueueItem item = std::move(_incomingScripts.front());
_incomingScripts.pop();
_scriptsToSync.push_back(item.script);
const bool remoteScripting = item.remoteScripting;
// Not really a received script but the master also needs to run the script...
_receivedScripts.push_back(_currentSyncedScript);
_queuedScripts.pop_back();
_masterScriptQueue.push(item);
if (global::parallelPeer.isHost() && remoteScripting) {
global::parallelPeer.sendScript(_currentSyncedScript);
global::parallelPeer.sendScript(item.script);
}
if (global::sessionRecording.isRecording()) {
global::sessionRecording.saveScriptKeyframe(_currentSyncedScript);
global::sessionRecording.saveScriptKeyframe(item.script);
}
}
_mutex.unlock();
}
void ScriptEngine::encode(SyncBuffer* syncBuffer) {
syncBuffer->encode(_currentSyncedScript);
_currentSyncedScript.clear();
size_t nScripts = _scriptsToSync.size();
syncBuffer->encode(nScripts);
for (const std::string& s : _scriptsToSync) {
syncBuffer->encode(s);
}
_scriptsToSync.clear();
}
void ScriptEngine::decode(SyncBuffer* syncBuffer) {
syncBuffer->decode(_currentSyncedScript);
std::lock_guard<std::mutex> guard(_slaveScriptsMutex);
size_t nScripts;
syncBuffer->decode(nScripts);
if (!_currentSyncedScript.empty()) {
_mutex.lock();
_receivedScripts.push_back(_currentSyncedScript);
_mutex.unlock();
for (size_t i = 0; i < nScripts; ++i) {
std::string script;
syncBuffer->decode(script);
_slaveScriptQueue.push(std::move(script));
}
}
void ScriptEngine::postSync(bool) {
std::vector<std::string> scripts;
_mutex.lock();
scripts.assign(_receivedScripts.begin(), _receivedScripts.end());
_receivedScripts.clear();
_mutex.unlock();
while (!scripts.empty()) {
try {
runScript(scripts.back());
void ScriptEngine::postSync(bool isMaster) {
if (isMaster) {
while (!_masterScriptQueue.empty()) {
std::string script = std::move(_masterScriptQueue.front().script);
ScriptCallback callback = std::move(_masterScriptQueue.front().callback);
_masterScriptQueue.pop();
try {
runScript(script, callback);
}
catch (const ghoul::RuntimeError& e) {
LERRORC(e.component, e.message);
continue;
}
}
catch (const ghoul::RuntimeError& e) {
LERRORC(e.component, e.message);
} else {
std::lock_guard<std::mutex> guard(_slaveScriptsMutex);
while (!_slaveScriptQueue.empty()) {
try {
runScript(_slaveScriptQueue.front());
_slaveScriptQueue.pop();
}
catch (const ghoul::RuntimeError& e) {
LERRORC(e.component, e.message);
}
}
scripts.pop_back();
}
}
void ScriptEngine::queueScript(const std::string& script,
ScriptEngine::RemoteScripting remoteScripting)
ScriptEngine::RemoteScripting remoteScripting,
ScriptCallback callback)
{
if (script.empty()) {
return;
if (!script.empty()) {
_incomingScripts.push({ script, remoteScripting, callback });
}
_mutex.lock();
_queuedScripts.insert(
_queuedScripts.begin(),
std::make_pair(script, remoteScripting)
);
_mutex.unlock();
}
} // namespace openspace::scripting
-16
View File
@@ -84,20 +84,4 @@ void SyncBuffer::reset() {
_decodeOffset = 0;
}
//void SyncBuffer::write() {
// _dataStream.resize(_encodeOffset);
// _synchronizationBuffer->setVal(_dataStream);
// sgct::SharedData::instance()->writeVector(_synchronizationBuffer.get());
// _dataStream.resize(_n);
// _encodeOffset = 0;
// _decodeOffset = 0;
//}
//
//void SyncBuffer::read() {
// sgct::SharedData::instance()->readVector(_synchronizationBuffer.get());
// _dataStream = _synchronizationBuffer->getVal();
// _encodeOffset = 0;
// _decodeOffset = 0;
//}
} // namespace openspace