Select right source address in CDC/Shard

This commit is contained in:
Francesco Mazzoli
2023-02-14 12:20:21 +00:00
parent 786939bf07
commit e580cd5fe9
2 changed files with 21 additions and 12 deletions

View File

@@ -63,6 +63,7 @@ struct CDCServer : Undertaker::Reapable {
private:
Env _env;
CDCShared& _shared;
std::array<uint8_t, 4> _ownIp;
uint16_t _desiredPort;
uint64_t _currentLogIndex;
std::vector<char> _recvBuf;
@@ -83,6 +84,7 @@ public:
CDCServer(Logger& logger, const CDCOptions& options, CDCShared& shared) :
_env(logger, "req_server"),
_shared(shared),
_ownIp(options.ownIp),
_desiredPort(options.port),
_recvBuf(UDP_MTU),
_sendBuf(UDP_MTU),
@@ -121,7 +123,7 @@ public:
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
memcpy(&addr.sin_addr.s_addr, _ownIp.data(), sizeof(_ownIp));
if (i == 0 && _desiredPort != 0) { // CDC with specified port
addr.sin_port = htons(_desiredPort);
} else { // automatically assigned port
@@ -129,7 +131,8 @@ public:
}
if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
if (i == 0 && _desiredPort != 0) {
throw SYSCALL_EXCEPTION("cannot bind socket to port %s", _desiredPort);
char ip[INET_ADDRSTRLEN];
throw SYSCALL_EXCEPTION("cannot bind socket to addr %s:%s", inet_ntop(AF_INET, _ownIp.data(), ip, INET_ADDRSTRLEN), _desiredPort);
} else {
throw SYSCALL_EXCEPTION("cannot bind socket");
}
@@ -275,7 +278,7 @@ private:
// Now, try to parse the body
try {
_cdcReqContainer.unpack(reqBbuf, reqHeader.kind);
LOG_DEBUG(_env, "parsed request: %s", _cdcReqContainer);
LOG_TRACE(_env, "parsed request: %s", _cdcReqContainer);
} catch (const BincodeException& exc) {
LOG_ERROR(_env, "could not parse: %s", exc.what());
RAISE_ALERT(_env, "could not parse CDC request of kind %s from %s, will reply with error.", reqHeader.kind, clientAddr);
@@ -359,11 +362,11 @@ private:
// Otherwise, parse the body
_shardRespContainer.unpack(reqBbuf, respHeader.kind);
LOG_DEBUG(_env, "parsed shard response: %s", _shardRespContainer);
LOG_TRACE(_env, "parsed shard response: %s", _shardRespContainer);
ALWAYS_ASSERT(reqBbuf.remaining() == 0);
// If all went well, advance with the newly received request
LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, will now process: %s", respHeader.requestId, respHeader.kind, _shardRespContainer);
LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, will now process", respHeader.requestId, respHeader.kind);
_shared.db.processShardResp(true, eggsNow(), _advanceLogIndex(), NO_ERROR, &_shardRespContainer, _step);
_processStep(_step);
}
@@ -399,7 +402,7 @@ private:
}
}
if (step.txnNeedsShard != 0) {
LOG_DEBUG(_env, "txn %s needs shard %s, req %s", step.txnNeedsShard, step.shardReq.shid, step.shardReq.req);
LOG_TRACE(_env, "txn %s needs shard %s, req %s", step.txnNeedsShard, step.shardReq.shid, step.shardReq.req);
BincodeBuf bbuf(&_sendBuf[0], _sendBuf.size());
// Header
ShardRequestHeader shardReqHeader;
@@ -500,9 +503,11 @@ public:
if (_shared.stop.load()) {
return;
}
if (eggsNow() - successfulIterationAt < 1_mins) {
auto now = eggsNow();
if (now - successfulIterationAt < 1_mins) {
continue;
}
LOG_INFO(_env, "Last successful shard fetch was at %s, now we're at %s, fetching again", successfulIterationAt, now);
std::string err = fetchShards(_shuckleHost, _shucklePort, 100_ms, *shards);
if (!err.empty()) {
LOG_INFO(_env, "failed to reach shuckle at %s:%s to fetch shards, will retry: %s", _shuckleHost, _shucklePort, err);
@@ -527,7 +532,7 @@ public:
}
}
LOG_INFO(_env, "successfully fetched all shards from shuckle, will wait one minute");
EggsTime successfulIterationAt = eggsNow();
successfulIterationAt = eggsNow();
}
}
};

View File

@@ -6,6 +6,7 @@
#include <fstream>
#include <chrono>
#include <thread>
#include <arpa/inet.h>
#include "Assert.hpp"
#include "Bincode.hpp"
@@ -52,6 +53,7 @@ private:
Env _env;
ShardShared& _shared;
ShardId _shid;
std::array<uint8_t, 4> _ownIp;
uint16_t _desiredPort;
uint64_t _packetDropRand;
uint64_t _incomingPacketDropProbability; // probability * 10,000
@@ -61,6 +63,7 @@ public:
_env(logger, "server"),
_shared(shared),
_shid(shid),
_ownIp(options.ownIp),
_desiredPort(options.port),
_packetDropRand((int)shid.u8 + 1), // CDC is 0
_incomingPacketDropProbability(0),
@@ -98,10 +101,11 @@ public:
}
struct sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
memcpy(&serverAddr.sin_addr.s_addr, _ownIp.data(), sizeof(_ownIp));
serverAddr.sin_port = htons(_desiredPort);
if (bind(sock, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) != 0) {
throw SYSCALL_EXCEPTION("cannot bind socket to port %s", _desiredPort);
char ip[INET_ADDRSTRLEN];
throw SYSCALL_EXCEPTION("cannot bind socket to addr %s:%s", inet_ntop(AF_INET, &serverAddr.sin_addr, ip, INET_ADDRSTRLEN), _desiredPort);
}
{
socklen_t addrLen = sizeof(serverAddr);
@@ -139,7 +143,7 @@ public:
memset(&clientAddr, 0, sizeof(clientAddr));
socklen_t addrLen = sizeof(clientAddr);
int read = recvfrom(sock, recvBuf.data(), recvBuf.size(), 0, (struct sockaddr*)&clientAddr, &addrLen);
if (read < 0 && errno == EAGAIN) {
if (read < 0 && (errno == EAGAIN || errno == EINTR)) {
continue;
}
if (read < 0) {
@@ -173,7 +177,7 @@ public:
// Now, try to parse the body
try {
reqContainer->unpack(reqBbuf, reqHeader.kind);
LOG_DEBUG(_env, "parsed request: %s", *reqContainer);
LOG_TRACE(_env, "parsed request: %s", *reqContainer);
} catch (const BincodeException& exc) {
LOG_ERROR(_env, "Could not parse: %s", exc.what());
RAISE_ALERT(_env, "could not parse request of kind %s from %s, will reply with error.", reqHeader.kind, clientAddr);