added a receive thread. fixed a bug where threads tried to join/delete themselves

This commit is contained in:
Joakim Kilby
2015-07-01 12:17:20 +02:00
parent 6697eeb7b7
commit aea4a1ac0c
2 changed files with 98 additions and 92 deletions

View File

@@ -211,6 +211,7 @@ namespace openspace{
std::thread *_connectionThread;
std::thread *_broadcastThread;
std::thread *_sendThread;
std::thread *_receiveThread;
std::atomic<bool> _isHost;
std::atomic<bool> _isConnected;
std::atomic<bool> _isListening;

View File

@@ -81,6 +81,7 @@ namespace openspace {
_connectionThread(nullptr),
_broadcastThread(nullptr),
_sendThread(nullptr),
_receiveThread(nullptr),
_isHost(false),
_isConnected(false),
_isListening(false)
@@ -193,16 +194,19 @@ namespace openspace {
{
//we're connected
_isConnected.store(true);
//and ready to start receiving messages
_isListening.store(true);
//send authentication
sendAuthentication();
//start listening for communication
listenCommunication();
//and ready to start receiving messages
_isListening.store(true);
//start listening for communication
_receiveThread = new (std::nothrow) std::thread(&ParallelConnection::listenCommunication, this);
//start sending messages
_sendThread = new (std::nothrow) std::thread(&ParallelConnection::sendLoop, this);
//and send authentication
sendAuthentication();
}
//try to connect once per second
@@ -420,7 +424,7 @@ namespace openspace {
void ParallelConnection::sendLoop(){
int result;
while(_isHost.load()){
while(_isConnected.load()){
_sendBufferMutex.lock();
if(_sendBuffer.size() > 0){
@@ -433,8 +437,11 @@ namespace openspace {
//make sure everything went well
if (result == SOCKET_ERROR){
//failed to send message
LERROR("Failed to send message.\nError: " << _ERRNO << " detected in connection.");
disconnect();
LERROR("Failed to send message.\nError: " << _ERRNO << " detected in connection, disconnecting.");
//@TODO a better solution for this - description:
//A thread cannot delete/join itself, in this case the listener thread would need to remove itself
//solution (for now) is to call the disconnect script via scriptengine. This is done from a separate thread so it works.
OsEng.scriptEngine()->queueScript("openspace.parallel.disconnect();");
}
}
@@ -468,9 +475,6 @@ namespace openspace {
//start broadcasting
_broadcastThread = new (std::nothrow) std::thread(&ParallelConnection::broadcast, this);
//and sending messages
_sendThread = new (std::nothrow) std::thread(&ParallelConnection::sendLoop, this);
}
}
else{ //we've been assigned as client
@@ -486,20 +490,7 @@ namespace openspace {
_broadcastThread->join();
delete _broadcastThread;
_broadcastThread = nullptr;
}
//delete send thread
if(_sendThread != nullptr){
_sendThread->join();
delete _sendThread;
_sendThread = nullptr;
//and clear all queued messages
_sendBufferMutex.lock();
_sendBuffer.clear();
_sendBufferMutex.unlock();
}
}
}
else{
//we were not broadcasting so nothing to do
@@ -521,8 +512,11 @@ namespace openspace {
}
}
else{
LERROR("Error " << _ERRNO << " detected in connection.");
disconnect();
LERROR("Error " << _ERRNO << " detected in connection, disconnecting.");
//@TODO a better solution for this - description:
//A thread cannot delete/join itself, in this case the listener thread would need to remove itself
//solution (for now) is to call the disconnect script via scriptengine. This is done from a separate thread so it works.
OsEng.scriptEngine()->queueScript("openspace.parallel.disconnect();");
}
}
@@ -594,13 +588,17 @@ namespace openspace {
else{
if (result == 0){
//connection rejected
_isConnected.store(false);
_isListening.store(false);
_isHost.store(false);
LERROR("Parallel connection rejected, disconnecting...");
}
else{
LERROR("Error " << _ERRNO << " detected in connection!");
LERROR("Error " << _ERRNO << " detected in connection, disconnecting!");
}
//@TODO (JK) a better solution for this - description:
//A thread cannot delete/join itself, in this case the listener thread would need to remove itself
//solution (for now) is to call the disconnect script via scriptengine. This is done from a separate thread so it works.
OsEng.scriptEngine()->queueScript("openspace.parallel.disconnect();");
break;
}
}
@@ -694,73 +692,80 @@ namespace openspace {
void ParallelConnection::disconnect(){
//must be run before trying to join communication threads, else the threads are stuck trying to receive data
closeSocket();
//tell broadcast thread to stop broadcasting
_isHost.store(false);
//tell connection thread to stop listening
_isListening.store(false);
if (_clientSocket != INVALID_SOCKET){
//must be run before trying to join communication threads, else the threads are stuck trying to receive data
closeSocket();
//tell connection thread that we are connected (to be able to join and delete thread)
_isConnected.store(true);
//join connection thread and delete it
if (_connectionThread != nullptr){
_connectionThread->join();
delete _connectionThread;
_connectionThread = nullptr;
}
//tell broadcast thread to stop broadcasting
_isHost.store(false);
//make sure to set us as NOT connected again, connection thread is now deleted
_isConnected.store(false);
//tell connection thread to stop listening
_isListening.store(false);
//tell connection thread that we are connected (to be able to join and delete thread)
_isConnected.store(true);
//join connection thread and delete it
if (_connectionThread != nullptr){
_connectionThread->join();
delete _connectionThread;
_connectionThread = nullptr;
}
//make sure to set us as NOT connected again, connection thread is now deleted
_isConnected.store(false);
//join receive thread and delete it
if (_receiveThread != nullptr){
_receiveThread->join();
delete _receiveThread;
_receiveThread = nullptr;
}
//join broadcast thread and delete it
if (_broadcastThread != nullptr){
_broadcastThread->join();
delete _broadcastThread;
_broadcastThread = nullptr;
}
//join send thread and delete it
if (_sendThread != nullptr){
_sendThread->join();
delete _sendThread;
_sendThread = nullptr;
}
//join broadcast thread and delete it
if (_broadcastThread != nullptr){
_broadcastThread->join();
delete _broadcastThread;
_broadcastThread = nullptr;
}
//join send thread and delete it
if (_sendThread != nullptr){
_sendThread->join();
delete _sendThread;
_sendThread = nullptr;
}
#if defined(__WIN32__)
WSACleanup();
WSACleanup();
#endif
}
}
void ParallelConnection::closeSocket(){
if (_clientSocket != INVALID_SOCKET)
{
/*
Windows shutdown options
* SD_RECIEVE
* SD_SEND
* SD_BOTH
Linux & Mac shutdown options
* SHUT_RD (Disables further receive operations)
* SHUT_WR (Disables further send operations)
* SHUT_RDWR (Disables further send and receive operations)
*/
/*
Windows shutdown options
* SD_RECIEVE
* SD_SEND
* SD_BOTH
#ifdef __WIN32__
shutdown(_clientSocket, SD_BOTH);
closesocket(_clientSocket);
#else
shutdown(_clientSocket, SHUT_RDWR);
close(_clientSocket);
#endif
Linux & Mac shutdown options
* SHUT_RD (Disables further receive operations)
* SHUT_WR (Disables further send operations)
* SHUT_RDWR (Disables further send and receive operations)
*/
_clientSocket = INVALID_SOCKET;
}
#ifdef __WIN32__
shutdown(_clientSocket, SD_BOTH);
closesocket(_clientSocket);
#else
shutdown(_clientSocket, SHUT_RDWR);
close(_clientSocket);
#endif
_clientSocket = INVALID_SOCKET;
}
bool ParallelConnection::initNetworkAPI(){