From aea4a1ac0c2ca6f373472f2535d6fe9072ff4f98 Mon Sep 17 00:00:00 2001 From: Joakim Kilby Date: Wed, 1 Jul 2015 12:17:20 +0200 Subject: [PATCH] added a receive thread. fixed a bug where threads tried to join/delete themselves --- .../openspace/network/parallelconnection.h | 1 + src/network/parallelconnection.cpp | 189 +++++++++--------- 2 files changed, 98 insertions(+), 92 deletions(-) diff --git a/include/openspace/network/parallelconnection.h b/include/openspace/network/parallelconnection.h index 8aeb93d01b..3e23ead18a 100644 --- a/include/openspace/network/parallelconnection.h +++ b/include/openspace/network/parallelconnection.h @@ -211,6 +211,7 @@ namespace openspace{ std::thread *_connectionThread; std::thread *_broadcastThread; std::thread *_sendThread; + std::thread *_receiveThread; std::atomic _isHost; std::atomic _isConnected; std::atomic _isListening; diff --git a/src/network/parallelconnection.cpp b/src/network/parallelconnection.cpp index 0a357ee8ac..c9f9b31682 100644 --- a/src/network/parallelconnection.cpp +++ b/src/network/parallelconnection.cpp @@ -81,6 +81,7 @@ namespace openspace { _connectionThread(nullptr), _broadcastThread(nullptr), _sendThread(nullptr), + _receiveThread(nullptr), _isHost(false), _isConnected(false), _isListening(false) @@ -193,16 +194,19 @@ namespace openspace { { //we're connected _isConnected.store(true); - - //and ready to start receiving messages - _isListening.store(true); - - //send authentication - sendAuthentication(); - - //start listening for communication - listenCommunication(); - + + //and ready to start receiving messages + _isListening.store(true); + + //start listening for communication + _receiveThread = new (std::nothrow) std::thread(&ParallelConnection::listenCommunication, this); + + //start sending messages + _sendThread = new (std::nothrow) std::thread(&ParallelConnection::sendLoop, this); + + //and send authentication + sendAuthentication(); + } //try to connect once per second @@ -420,7 +424,7 @@ namespace openspace { void ParallelConnection::sendLoop(){ int result; - while(_isHost.load()){ + while(_isConnected.load()){ _sendBufferMutex.lock(); if(_sendBuffer.size() > 0){ @@ -433,8 +437,11 @@ namespace openspace { //make sure everything went well if (result == SOCKET_ERROR){ //failed to send message - LERROR("Failed to send message.\nError: " << _ERRNO << " detected in connection."); - disconnect(); + LERROR("Failed to send message.\nError: " << _ERRNO << " detected in connection, disconnecting."); + //@TODO a better solution for this - description: + //A thread cannot delete/join itself, in this case the listener thread would need to remove itself + //solution (for now) is to call the disconnect script via scriptengine. This is done from a separate thread so it works. + OsEng.scriptEngine()->queueScript("openspace.parallel.disconnect();"); } } @@ -468,9 +475,6 @@ namespace openspace { //start broadcasting _broadcastThread = new (std::nothrow) std::thread(&ParallelConnection::broadcast, this); - - //and sending messages - _sendThread = new (std::nothrow) std::thread(&ParallelConnection::sendLoop, this); } } else{ //we've been assigned as client @@ -486,20 +490,7 @@ namespace openspace { _broadcastThread->join(); delete _broadcastThread; _broadcastThread = nullptr; - } - - //delete send thread - if(_sendThread != nullptr){ - _sendThread->join(); - delete _sendThread; - _sendThread = nullptr; - - //and clear all queued messages - _sendBufferMutex.lock(); - _sendBuffer.clear(); - _sendBufferMutex.unlock(); - } - + } } else{ //we were not broadcasting so nothing to do @@ -521,8 +512,11 @@ namespace openspace { } } else{ - LERROR("Error " << _ERRNO << " detected in connection."); - disconnect(); + LERROR("Error " << _ERRNO << " detected in connection, disconnecting."); + //@TODO a better solution for this - description: + //A thread cannot delete/join itself, in this case the listener thread would need to remove itself + //solution (for now) is to call the disconnect script via scriptengine. This is done from a separate thread so it works. + OsEng.scriptEngine()->queueScript("openspace.parallel.disconnect();"); } } @@ -594,13 +588,17 @@ namespace openspace { else{ if (result == 0){ //connection rejected - _isConnected.store(false); - _isListening.store(false); - _isHost.store(false); + LERROR("Parallel connection rejected, disconnecting..."); } else{ - LERROR("Error " << _ERRNO << " detected in connection!"); + LERROR("Error " << _ERRNO << " detected in connection, disconnecting!"); } + + //@TODO (JK) a better solution for this - description: + //A thread cannot delete/join itself, in this case the listener thread would need to remove itself + //solution (for now) is to call the disconnect script via scriptengine. This is done from a separate thread so it works. + OsEng.scriptEngine()->queueScript("openspace.parallel.disconnect();"); + break; } } @@ -694,73 +692,80 @@ namespace openspace { void ParallelConnection::disconnect(){ - - //must be run before trying to join communication threads, else the threads are stuck trying to receive data - closeSocket(); - - //tell broadcast thread to stop broadcasting - _isHost.store(false); - - //tell connection thread to stop listening - _isListening.store(false); + if (_clientSocket != INVALID_SOCKET){ + //must be run before trying to join communication threads, else the threads are stuck trying to receive data + closeSocket(); - //tell connection thread that we are connected (to be able to join and delete thread) - _isConnected.store(true); - - //join connection thread and delete it - if (_connectionThread != nullptr){ - _connectionThread->join(); - delete _connectionThread; - _connectionThread = nullptr; - } + //tell broadcast thread to stop broadcasting + _isHost.store(false); - //make sure to set us as NOT connected again, connection thread is now deleted - _isConnected.store(false); + //tell connection thread to stop listening + _isListening.store(false); + + //tell connection thread that we are connected (to be able to join and delete thread) + _isConnected.store(true); + + //join connection thread and delete it + if (_connectionThread != nullptr){ + _connectionThread->join(); + delete _connectionThread; + _connectionThread = nullptr; + } + + //make sure to set us as NOT connected again, connection thread is now deleted + _isConnected.store(false); + + + //join receive thread and delete it + if (_receiveThread != nullptr){ + _receiveThread->join(); + delete _receiveThread; + _receiveThread = nullptr; + } + + //join broadcast thread and delete it + if (_broadcastThread != nullptr){ + _broadcastThread->join(); + delete _broadcastThread; + _broadcastThread = nullptr; + } + + //join send thread and delete it + if (_sendThread != nullptr){ + _sendThread->join(); + delete _sendThread; + _sendThread = nullptr; + } - //join broadcast thread and delete it - if (_broadcastThread != nullptr){ - _broadcastThread->join(); - delete _broadcastThread; - _broadcastThread = nullptr; - } - - //join send thread and delete it - if (_sendThread != nullptr){ - _sendThread->join(); - delete _sendThread; - _sendThread = nullptr; - } - #if defined(__WIN32__) - WSACleanup(); + WSACleanup(); #endif + } } void ParallelConnection::closeSocket(){ - if (_clientSocket != INVALID_SOCKET) - { - /* - Windows shutdown options - * SD_RECIEVE - * SD_SEND - * SD_BOTH - Linux & Mac shutdown options - * SHUT_RD (Disables further receive operations) - * SHUT_WR (Disables further send operations) - * SHUT_RDWR (Disables further send and receive operations) - */ + /* + Windows shutdown options + * SD_RECIEVE + * SD_SEND + * SD_BOTH - #ifdef __WIN32__ - shutdown(_clientSocket, SD_BOTH); - closesocket(_clientSocket); - #else - shutdown(_clientSocket, SHUT_RDWR); - close(_clientSocket); - #endif + Linux & Mac shutdown options + * SHUT_RD (Disables further receive operations) + * SHUT_WR (Disables further send operations) + * SHUT_RDWR (Disables further send and receive operations) + */ - _clientSocket = INVALID_SOCKET; - } +#ifdef __WIN32__ + shutdown(_clientSocket, SD_BOTH); + closesocket(_clientSocket); +#else + shutdown(_clientSocket, SHUT_RDWR); + close(_clientSocket); +#endif + + _clientSocket = INVALID_SOCKET; } bool ParallelConnection::initNetworkAPI(){