/***************************************************************************************** * * * OpenSpace * * * * Copyright (c) 2014-2018 * * * * Permission is hereby granted, free of charge, to any person obtaining a copy of this * * software and associated documentation files (the "Software"), to deal in the Software * * without restriction, including without limitation the rights to use, copy, modify, * * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * * permit persons to whom the Software is furnished to do so, subject to the following * * conditions: * * * * The above copyright notice and this permission notice shall be included in all copies * * or substantial portions of the Software. * * * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * ****************************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include namespace { constexpr const char* _loggerCat = "ServerModule"; } // namespace namespace openspace { ServerModule::ServerModule() : OpenSpaceModule(ServerModule::Name) {} ServerModule::~ServerModule() { disconnectAll(); cleanUpFinishedThreads(); } void ServerModule::internalInitialize(const ghoul::Dictionary&) { using namespace ghoul::io; std::unique_ptr tcpServer = std::make_unique(); std::unique_ptr wsServer = std::make_unique(); // Temporary hard coded addresses and ports. tcpServer->listen("localhost", 8000); wsServer->listen("localhost", 8001); LDEBUG(fmt::format( "TCP Server listening on {}:{}",tcpServer->address(), tcpServer->port() )); LDEBUG(fmt::format( "WS Server listening on {}:{}", wsServer->address(), wsServer->port() )); _servers.push_back(std::move(tcpServer)); _servers.push_back(std::move(wsServer)); OsEng.registerModuleCallback( OpenSpaceEngine::CallbackOption::PreSync, [this]() { preSync(); } ); } void ServerModule::preSync() { // Set up new connections. for (std::unique_ptr& server : _servers) { std::unique_ptr socket; while ((socket = server->nextPendingSocket())) { socket->startStreams(); std::shared_ptr connection = std::make_shared( std::move(socket), server->address() ); connection->setThread(std::thread( [this, connection] () { handleConnection(connection); } )); _connections.push_back({ std::move(connection), false }); } } // Consume all messages put into the message queue by the socket threads. consumeMessages(); // Join threads for sockets that disconnected. cleanUpFinishedThreads(); } void ServerModule::cleanUpFinishedThreads() { for (ConnectionData& connectionData : _connections) { Connection& connection = *connectionData.connection; if (!connection.socket() || !connection.socket()->isConnected()) { if (connection.thread().joinable()) { connection.thread().join(); connectionData.isMarkedForRemoval = true; } } } _connections.erase(std::remove_if( _connections.begin(), _connections.end(), [](const ConnectionData& connectionData) { return connectionData.isMarkedForRemoval; } ), _connections.end()); } void ServerModule::disconnectAll() { for (ConnectionData& connectionData : _connections) { Connection& connection = *connectionData.connection; if (connection.socket() && connection.socket()->isConnected()) { connection.socket()->disconnect( static_cast(ghoul::io::WebSocket::ClosingReason::ClosingAll) ); } } } void ServerModule::handleConnection(std::shared_ptr connection) { std::string messageString; while (connection->socket()->getMessage(messageString)) { std::lock_guard lock(_messageQueueMutex); _messageQueue.push_back({ connection, std::move(messageString) }); } } void ServerModule::consumeMessages() { std::lock_guard lock(_messageQueueMutex); while (!_messageQueue.empty()) { const Message& m = _messageQueue.front(); _messageQueue.pop_front(); if (std::shared_ptr c = m.connection.lock()) { c->handleMessage(m.messageString); } } } } // namespace openspace