shard: support checkpointed responses

This commit is contained in:
Miroslav Crnic
2024-06-13 15:39:37 +01:00
committed by GitHub Enterprise
parent 0bf547faf5
commit 9e13d6b56e
6 changed files with 147 additions and 24 deletions

View File

@@ -19,6 +19,7 @@
#include "LogsDB.hpp"
#include "Loop.hpp"
#include "Metrics.hpp"
#include "MsgsGen.hpp"
#include "MultiplexedChannel.hpp"
#include "PeriodicLoop.hpp"
#include "Protocol.hpp"
@@ -43,6 +44,7 @@ struct QueuedShardLogEntry {
IpPort clientAddr;
int sockIx; // which sock to use to reply
ShardMessageKind requestKind;
bool checkPointed;
};
struct SnapshotRequest {
@@ -277,6 +279,49 @@ static void packShardResponse(
LOG_DEBUG(env, "will send response for req id %s kind %s to %s", msg.id, reqKind, clientAddr);
}
static void packCheckPointedShardResponse(
Env& env,
ShardShared& shared,
const AddrsInfo& srcAddr,
UDPSender& sender,
Duration elapsed,
bool dropArtificially,
const IpPort& clientAddr,
uint8_t sockIx,
ShardMessageKind reqKind,
const ShardCheckPointedRespMsg& msg,
const AES128Key& key
) {
auto respKind = msg.body.resp.kind();
shared.timings[(int)reqKind].add(elapsed);
shared.errors[(int)reqKind].add( respKind != ShardMessageKind::ERROR ? EggsError::NO_ERROR : msg.body.resp.getError());
if (unlikely(dropArtificially)) {
LOG_DEBUG(env, "artificially dropping response %s", msg.id);
return;
}
ALWAYS_ASSERT(clientAddr.port != 0);
if (respKind != ShardMessageKind::ERROR) {
LOG_DEBUG(env, "successfully processed request %s with kind %s in %s", msg.id, reqKind, elapsed);
if (bigResponse(respKind)) {
if (unlikely(env._shouldLog(LogLevel::LOG_TRACE))) {
LOG_TRACE(env, "resp: %s", msg);
} else {
LOG_DEBUG(env, "resp: <omitted>");
}
} else {
LOG_DEBUG(env, "resp: %s", msg);
}
} else {
LOG_DEBUG(env, "request %s failed with error %s in %s", reqKind, msg.body.resp.getError(), elapsed);
}
sender.prepareOutgoingMessage(env, srcAddr, sockIx, clientAddr,
[&msg, &key](BincodeBuf& buf) {
msg.pack(buf, key);
});
LOG_DEBUG(env, "will send response for req id %s kind %s to %s", msg.id, reqKind, clientAddr);
}
// a hack while parts of messages in shard protocol are signed and part are unsigned
static ShardMessageKind peekKind(const BincodeBuf& buf) {
auto tmpBuf = buf;
@@ -305,7 +350,7 @@ private:
std::unique_ptr<UDPReceiver<1>> _receiver;
std::unique_ptr<UDPSender> _sender;
std::unique_ptr<MultiplexedChannel<3, std::array<uint32_t, 3>{SHARD_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}>> _channel;
std::unique_ptr<MultiplexedChannel<4, std::array<uint32_t, 4>{SHARD_REQ_PROTOCOL_VERSION, SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}>> _channel;
public:
ShardServer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared) :
Loop(logger, xmon, "server"),
@@ -326,7 +371,7 @@ public:
expandKey(CDCKey, _expandedCDCKey);
_receiver = std::make_unique<UDPReceiver<1>>(UDPReceiverConfig{.perSockMaxRecvMsg = MAX_RECV_MSGS, .maxMsgSize = MAX_UDP_MTU});
_sender = std::make_unique<UDPSender>(UDPSenderConfig{.maxMsgSize = MAX_UDP_MTU});
_channel = std::make_unique<MultiplexedChannel<3, std::array<uint32_t, 3>{SHARD_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}>>();
_channel = std::make_unique<MultiplexedChannel<4, std::array<uint32_t, 4>{SHARD_REQ_PROTOCOL_VERSION, SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}>>();
}
virtual ~ShardServer() = default;
@@ -393,17 +438,21 @@ private:
return LogsDB::REPLICA_COUNT;
}
void _handleShardRequest(UDPMessage& msg) {
void _handleShardRequest(UDPMessage& msg, bool checkPointed) {
LOG_DEBUG(_env, "received message from %s", msg.clientAddr);
if (unlikely(!_shared.isLeader.load(std::memory_order_relaxed))) {
LOG_DEBUG(_env, "not leader, dropping request %s", msg.clientAddr);
return;
}
// First, try to parse the header
ShardReqMsg req;
try {
if (isPrivilegedRequestKind((uint8_t)peekKind(msg.buf))) {
if (checkPointed) {
ShardCheckPointedReqMsg signedReq;
signedReq.unpack(msg.buf, _expandedCDCKey);
req.id = signedReq.id;
req.body = std::move(signedReq.body);
} else if (isPrivilegedRequestKind((uint8_t)peekKind(msg.buf))) {
SignedShardReqMsg signedReq;
signedReq.unpack(msg.buf, _expandedCDCKey);
req.id = signedReq.id;
@@ -435,9 +484,10 @@ private:
// if it's a write request we prepare the log entry and
// send it off.
ShardRespMsg respContainer;
respContainer.id = req.id;
ShardCheckPointedRespMsg checkpointedRespContainer;
checkpointedRespContainer.id = respContainer.id = req.id;
if (readOnlyShardReq(req.body.kind())) {
_shared.shardDB.read(req.body, respContainer.body);
checkpointedRespContainer.body.checkPointIdx = _shared.shardDB.read(req.body, checkPointed ? checkpointedRespContainer.body.resp : respContainer.body);
} else if (unlikely(req.body.kind() == ShardMessageKind::SHARD_SNAPSHOT)) {
auto& entry = _logEntries.emplace_back().setSnapshotRequest();
entry.sockIx = msg.socketIx;
@@ -453,18 +503,25 @@ private:
entry.receivedAt = t0;
entry.requestKind = req.body.kind();
entry.requestId = req.id;
entry.checkPointed = checkPointed;
auto err = _shared.shardDB.prepareLogEntry(req.body, entry.logEntry);
if (likely(err == EggsError::NO_ERROR)) {
return; // we're done here, move along
} else {
respContainer.body.setError() = err;
(checkPointed ? checkpointedRespContainer.body.resp : respContainer.body).setError() = err;
_logEntries.pop_back(); // back out the log entry
}
}
Duration elapsed = eggsNow() - t0;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
packShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), respContainer);
if (checkPointed) {
packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), checkpointedRespContainer, _expandedCDCKey);
} else {
packShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), respContainer);
}
}
public:
@@ -489,7 +546,11 @@ public:
}
std::array<size_t, 2> shardMsgCount{0, 0};
for (auto& msg : _channel->protocolMessages(SHARD_REQ_PROTOCOL_VERSION)) {
_handleShardRequest(msg);
_handleShardRequest(msg, false);
++shardMsgCount[msg.socketIx];
}
for (auto& msg : _channel->protocolMessages(SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION)) {
_handleShardRequest(msg, true);
++shardMsgCount[msg.socketIx];
}
@@ -520,8 +581,10 @@ private:
const std::string _basePath;
ShardShared& _shared;
AES128Key _expandedShardKey;
AES128Key _expandedCDCKey;
uint64_t _currentLogIndex;
ShardRespMsg _respContainer;
ShardCheckPointedRespMsg _checkPointedrespContainer;
std::vector<ShardWriterRequest> _requests;
const size_t _maxWritesAtOnce;
@@ -560,6 +623,7 @@ public:
_outgoingPacketDropProbability(0)
{
expandKey(ShardKey, _expandedShardKey);
expandKey(CDCKey, _expandedCDCKey);
_currentLogIndex = _shared.shardDB.lastAppliedLogEntry();
auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) {
if (prob != 0.0) {
@@ -667,8 +731,9 @@ public:
ALWAYS_ASSERT(it != _inFlightEntries.end());
ALWAYS_ASSERT(shardEntry == it->second.logEntry);
}
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, _respContainer.body);
_checkPointedrespContainer.body.checkPointIdx = logsDBEntry.idx;
auto& resp = (it != _inFlightEntries.end() && it->second.checkPointed) ? _checkPointedrespContainer.body.resp : _respContainer.body;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp);
if (it != _inFlightEntries.end()) {
auto& logEntry = _outgoingLogEntries.emplace_back(std::move(it->second));
_inFlightEntries.erase(it);
@@ -680,12 +745,17 @@ public:
LOG_DEBUG(_env, "applying request-less log entry");
}
if (likely(logEntry.requestId)) {
_respContainer.id = logEntry.requestId;
_checkPointedrespContainer.id = _respContainer.id = logEntry.requestId;
Duration elapsed = eggsNow() - logEntry.receivedAt;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
packShardResponse(_env, _shared, _shared.sock().addr(), _sender, elapsed, dropArtificially, logEntry.clientAddr, logEntry.sockIx, logEntry.requestKind, _respContainer);
} else if (unlikely(_respContainer.body.kind() == ShardMessageKind::ERROR)) {
RAISE_ALERT(_env, "could not apply request-less log entry: %s", _respContainer.body.getError());
if (logEntry.checkPointed) {
packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, elapsed, dropArtificially, logEntry.clientAddr, logEntry.sockIx, logEntry.requestKind, _checkPointedrespContainer, _expandedCDCKey);
} else {
packShardResponse(_env, _shared, _shared.sock().addr(), _sender, elapsed, dropArtificially, logEntry.clientAddr, logEntry.sockIx, logEntry.requestKind, _respContainer);
}
} else if (unlikely(resp.kind() == ShardMessageKind::ERROR)) {
RAISE_ALERT(_env, "could not apply request-less log entry: %s", resp.getError());
}
}
}