shard/cdc: support snapshoting

This commit is contained in:
Miroslav Crnic
2024-05-23 10:17:59 +01:00
committed by GitHub Enterprise
parent 1446e4d0d2
commit 1f145c030e
12 changed files with 761 additions and 228 deletions
+47 -2
View File
@@ -188,6 +188,7 @@ constexpr int MAX_UPDATE_SIZE = 500;
struct CDCServer : Loop {
private:
CDCShared& _shared;
const std::string _basePath;
bool _seenShards;
uint64_t _currentLogIndex;
LogIdx _logsDBLogIndex;
@@ -233,9 +234,10 @@ private:
std::unordered_map<uint64_t, std::vector<CDCReqInfo>> _logEntryIdxToReqInfos;
std::shared_ptr<std::array<AddrsInfo, LogsDB::REPLICA_COUNT>> _replicas;
public:
CDCServer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, CDCOptions& options, CDCShared& shared) :
CDCServer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, CDCOptions& options, CDCShared& shared, const std::string& basePath) :
Loop(logger, xmon, "req_server"),
_shared(shared),
_basePath(basePath),
_seenShards(false),
_currentLogIndex(_shared.db.lastAppliedLogEntry()),
// important to not catch stray requests from previous executions
@@ -650,6 +652,12 @@ private:
}
LOG_DEBUG(_env, "received request id %s, kind %s", reqHeader.requestId, reqHeader.kind);
if (unlikely(reqHeader.kind == CDCMessageKind::CDC_SNAPSHOT)) {
_processCDCSnapshotMessage(reqHeader, msg);
continue;
}
auto receivedAt = eggsNow();
// If we're already processing this request, drop it to try to not clog the queue
@@ -739,6 +747,43 @@ private:
}
}
void _processCDCSnapshotMessage(CDCRequestHeader& reqHeader, UDPMessage& msg) {
// If this will be filled in with an actual code, it means that we couldn't process
// the request.
EggsError err = NO_ERROR;
// Now, try to parse the body
CDCReqContainer req;
try {
req.unpack(msg.buf, reqHeader.kind);
LOG_DEBUG(_env, "parsed request: %s", req);
} catch (const BincodeException& exc) {
LOG_ERROR(_env, "could not parse: %s", exc.what());
RAISE_ALERT(_env, "could not parse CDC request of kind %s from %s, will reply with error.", reqHeader.kind, msg.clientAddr);
err = EggsError::MALFORMED_REQUEST;
}
// Make sure nothing is left
if (err == NO_ERROR && msg.buf.remaining() != 0) {
RAISE_ALERT(_env, "%s bytes remaining after parsing CDC request of kind %s from %s, will reply with error", msg.buf.remaining(), reqHeader.kind, msg.clientAddr);
err = EggsError::MALFORMED_REQUEST;
}
if (err == NO_ERROR) {
LOG_DEBUG(_env, "CDC request %s successfully parsed, processing", req.kind());
err = _shared.sharedDb.snapshot(_basePath +"/snapshot-" + std::to_string(req.getCdcSnapshot().snapshotId));
}
if (err == NO_ERROR) {
CDCResponseHeader respHeader{reqHeader.requestId, reqHeader.kind};
CDCRespContainer resp;
resp.setCdcSnapshot();
_packCDCResponse(msg.socketIx, msg.clientAddr, respHeader, resp);
} else {
_packCDCResponseError(msg.socketIx, msg.clientAddr, reqHeader, err);
}
}
#ifdef __clang__
__attribute__((no_sanitize("integer"))) // might wrap around (it's initialized randomly)
#endif
@@ -1173,7 +1218,7 @@ void runCDC(const std::string& dbDir, CDCOptions& options) {
LOG_INFO(env, "Spawning server threads");
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCShardUpdater>(logger, xmon, options, shared)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCServer>(logger, xmon, options, shared)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCServer>(logger, xmon, options, shared, dbDir)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCRegisterer>(logger, xmon, options, shared)));
if (options.shuckleStats) {
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCStatsInserter>(logger, xmon, options, shared)));