mirror of
https://github.com/XTXMarkets/ternfs.git
synced 2025-12-21 10:40:04 -06:00
committed by
Francesco Mazzoli
parent
7e7ba6b04b
commit
c5562c7ca3
404
cpp/cdc/CDC.cpp
404
cpp/cdc/CDC.cpp
@@ -6,13 +6,14 @@
|
||||
#include <netinet/ip.h>
|
||||
#include <sys/socket.h>
|
||||
#include <atomic>
|
||||
#include <sys/epoll.h>
|
||||
#include <fcntl.h>
|
||||
#include <optional>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
#include <arpa/inet.h>
|
||||
#include <unordered_set>
|
||||
#include <map>
|
||||
#include <poll.h>
|
||||
|
||||
#include "Bincode.hpp"
|
||||
#include "CDC.hpp"
|
||||
@@ -41,9 +42,6 @@ struct CDCShared {
|
||||
std::array<ShardInfo, 256> shards;
|
||||
// How long it took us to process the entire request, from parse to response.
|
||||
std::array<Timings, maxCDCMessageKind+1> timingsTotal;
|
||||
// How long it took to process the request, from when it exited the queue to
|
||||
// when it finished executing.
|
||||
std::array<Timings, maxCDCMessageKind+1> timingsProcess;
|
||||
std::array<ErrorCount, maxCDCMessageKind+1> errors;
|
||||
// right now we have a max of 200req/s and we send the metrics every minute or so, so
|
||||
// this should cover us for at least 1.5mins. Power of two is good for mod.
|
||||
@@ -57,7 +55,6 @@ struct CDCShared {
|
||||
ownPorts[1].store(0);
|
||||
for (CDCMessageKind kind : allCDCMessageKind) {
|
||||
timingsTotal[(int)kind] = Timings::Standard();
|
||||
timingsProcess[(int)kind] = Timings::Standard();
|
||||
}
|
||||
for (int i = 0; i < inFlightTxnsWindow.size(); i++) {
|
||||
inFlightTxnsWindow[i] = 0;
|
||||
@@ -66,13 +63,15 @@ struct CDCShared {
|
||||
};
|
||||
|
||||
struct InFlightShardRequest {
|
||||
uint64_t txnId; // the txn id that requested this shard request
|
||||
CDCTxnId txnId; // the txn id that requested this shard request
|
||||
EggsTime sentAt;
|
||||
uint64_t shardRequestId;
|
||||
ShardId shid;
|
||||
};
|
||||
|
||||
struct InFlightCDCRequest {
|
||||
bool hasClient;
|
||||
uint64_t lastSentRequestId;
|
||||
// if hasClient=false, the following is all garbage.
|
||||
uint64_t cdcRequestId;
|
||||
EggsTime receivedAt;
|
||||
struct sockaddr_in clientAddr;
|
||||
@@ -114,10 +113,70 @@ struct InFlightCDCRequestKey {
|
||||
template <>
|
||||
struct std::hash<InFlightCDCRequestKey> {
|
||||
std::size_t operator()(const InFlightCDCRequestKey& key) const {
|
||||
return key.requestId ^ (((uint64_t)key.port << 32) | ((uint64_t)key.ip));
|
||||
return std::hash<uint64_t>{}(key.requestId ^ (((uint64_t)key.port << 32) | ((uint64_t)key.ip)));
|
||||
}
|
||||
};
|
||||
|
||||
struct InFlightShardRequests {
|
||||
private:
|
||||
using RequestsMap = std::unordered_map<uint64_t, InFlightShardRequest>;
|
||||
RequestsMap _reqs;
|
||||
|
||||
std::map<EggsTime, uint64_t> _pq;
|
||||
|
||||
public:
|
||||
size_t size() const {
|
||||
return _reqs.size();
|
||||
}
|
||||
|
||||
RequestsMap::const_iterator oldest() const {
|
||||
ALWAYS_ASSERT(size() > 0);
|
||||
auto reqByTime = _pq.begin();
|
||||
return _reqs.find(reqByTime->second);
|
||||
}
|
||||
|
||||
RequestsMap::const_iterator find(uint64_t reqId) const {
|
||||
return _reqs.find(reqId);
|
||||
}
|
||||
|
||||
RequestsMap::const_iterator end() {
|
||||
return _reqs.end();
|
||||
}
|
||||
|
||||
void erase(RequestsMap::const_iterator iterator) {
|
||||
_pq.erase(iterator->second.sentAt);
|
||||
_reqs.erase(iterator);
|
||||
}
|
||||
|
||||
void insert(uint64_t reqId, const InFlightShardRequest& req) {
|
||||
auto [reqIt, inserted] = _reqs.insert({reqId, req});
|
||||
|
||||
// TODO i think we can just assert inserted, we never need this
|
||||
// functionality
|
||||
|
||||
if (inserted) {
|
||||
// we have never seen this shard request.
|
||||
// technically we could get the same time twice, but in practice
|
||||
// we won't, so just assert it.
|
||||
ALWAYS_ASSERT(_pq.insert({req.sentAt, reqId}).second);
|
||||
} else {
|
||||
// we had already seen this. make sure it's for the same stuff, and update pq.
|
||||
ALWAYS_ASSERT(reqIt->second.txnId == req.txnId);
|
||||
ALWAYS_ASSERT(reqIt->second.shid == req.shid);
|
||||
ALWAYS_ASSERT(_pq.erase(reqIt->second.sentAt) == 1); // must be already present
|
||||
ALWAYS_ASSERT(_pq.insert({req.sentAt, reqId}).second); // insert with new time
|
||||
reqIt->second.sentAt = req.sentAt; // update time in existing entry
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct CDCReqInfo {
|
||||
uint64_t reqId;
|
||||
struct sockaddr_in clientAddr;
|
||||
EggsTime receivedAt;
|
||||
int sock;
|
||||
};
|
||||
|
||||
struct CDCServer : Loop {
|
||||
private:
|
||||
CDCShared& _shared;
|
||||
@@ -126,19 +185,21 @@ private:
|
||||
uint64_t _currentLogIndex;
|
||||
std::vector<char> _recvBuf;
|
||||
std::vector<char> _sendBuf;
|
||||
CDCReqContainer _cdcReqContainer;
|
||||
ShardRespContainer _shardRespContainer;
|
||||
std::vector<CDCReqContainer> _cdcReqs;
|
||||
std::vector<CDCReqInfo> _cdcReqsInfo;
|
||||
std::vector<CDCTxnId> _cdcReqsTxnIds;
|
||||
std::vector<CDCShardResp> _shardResps;
|
||||
CDCStep _step;
|
||||
uint64_t _shardRequestIdCounter;
|
||||
int _epoll;
|
||||
std::array<int, 4> _socks;
|
||||
struct epoll_event _events[4];
|
||||
// order: CDC, shard, CDC, shard
|
||||
// length will be 2 or 4 depending on whether we have a second ip
|
||||
std::vector<struct pollfd> _socks;
|
||||
AES128Key _expandedCDCKey;
|
||||
Duration _shardTimeout;
|
||||
uint64_t _maximumEnqueuedRequests;
|
||||
// The requests we've enqueued, but haven't completed yet, with
|
||||
// where to send the response. Indexed by txn id.
|
||||
std::unordered_map<uint64_t, InFlightCDCRequest> _inFlightTxns;
|
||||
std::unordered_map<CDCTxnId, InFlightCDCRequest> _inFlightTxns;
|
||||
uint64_t _inFlightTxnsWindowCursor;
|
||||
// The enqueued requests, but indexed by req id + ip + port. We
|
||||
// store this so that we can drop repeated requests which are
|
||||
@@ -148,27 +209,7 @@ private:
|
||||
// a useful optimization for now that we live with it.
|
||||
std::unordered_set<InFlightCDCRequestKey> _inFlightCDCReqs;
|
||||
// The _shard_ request we're currently waiting for, if any.
|
||||
std::optional<InFlightShardRequest> _inFlightShardReq;
|
||||
// This is just used to calculate the timings
|
||||
uint64_t _runningTxn;
|
||||
CDCMessageKind _runningTxnKind;
|
||||
EggsTime _runningTxnStartedAt;
|
||||
CDCStatus _status;
|
||||
|
||||
void _updateProcessTimings() {
|
||||
if (_status.runningTxn != _runningTxn) { // we've got something new
|
||||
EggsTime now = eggsNow();
|
||||
if (_runningTxn != 0) { // something has finished running
|
||||
_shared.timingsProcess[(int)_runningTxnKind].add(now - _runningTxnStartedAt);
|
||||
_runningTxn = 0;
|
||||
}
|
||||
if (_status.runningTxn != 0) { // something has started running
|
||||
_runningTxn = _status.runningTxn;
|
||||
_runningTxnKind = _status.runningTxnKind;
|
||||
_runningTxnStartedAt = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
InFlightShardRequests _inFlightShardReqs;
|
||||
|
||||
public:
|
||||
CDCServer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const CDCOptions& options, CDCShared& shared) :
|
||||
@@ -178,14 +219,13 @@ public:
|
||||
_ipPorts(options.ipPorts),
|
||||
_recvBuf(DEFAULT_UDP_MTU),
|
||||
_sendBuf(DEFAULT_UDP_MTU),
|
||||
_shardRequestIdCounter(0),
|
||||
// important to not catch stray requests from previous executions
|
||||
_shardRequestIdCounter(wyhash64_rand()),
|
||||
_shardTimeout(options.shardTimeout),
|
||||
_maximumEnqueuedRequests(options.maximumEnqueuedRequests),
|
||||
_inFlightTxnsWindowCursor(0),
|
||||
_runningTxn(0)
|
||||
_inFlightTxnsWindowCursor(0)
|
||||
{
|
||||
_currentLogIndex = _shared.db.lastAppliedLogEntry();
|
||||
memset(&_socks[0], 0, sizeof(_socks));
|
||||
expandKey(CDCKey, _expandedCDCKey);
|
||||
}
|
||||
|
||||
@@ -201,32 +241,70 @@ public:
|
||||
_seenShards = true;
|
||||
_initAfterShardsSeen();
|
||||
}
|
||||
|
||||
// clear internal buffers
|
||||
_cdcReqs.clear();
|
||||
_cdcReqsInfo.clear();
|
||||
_cdcReqsTxnIds.clear();
|
||||
_shardResps.clear();
|
||||
|
||||
// Process CDC requests and shard responses
|
||||
{
|
||||
auto now = eggsNow();
|
||||
if (_inFlightShardReq && (now - _inFlightShardReq->sentAt) > _shardTimeout) {
|
||||
LOG_DEBUG(_env, "in-flight shard request %s was sent at %s, it's now %s, timing out (%s > %s)", _inFlightShardReq->shardRequestId, _inFlightShardReq->sentAt, now, (now - _inFlightShardReq->sentAt), _shardTimeout);
|
||||
auto shid = _inFlightShardReq->shid;
|
||||
_inFlightShardReq.reset();
|
||||
_handleShardError(shid, EggsError::TIMEOUT);
|
||||
for (;;) {
|
||||
if (_inFlightShardReqs.size() == 0) { break; }
|
||||
auto oldest = _inFlightShardReqs.oldest();
|
||||
if ((now - oldest->second.sentAt) < _shardTimeout) { break; }
|
||||
|
||||
LOG_DEBUG(_env, "in-flight shard request %s was sent at %s, it's now %s, will time out (%s > %s)", oldest->first, oldest->second.sentAt, now, (now - oldest->second.sentAt), _shardTimeout);
|
||||
auto resp = _prepareCDCShardResp(oldest->first); // erases `oldest`
|
||||
ALWAYS_ASSERT(resp != nullptr); // must be there, we've just timed it out
|
||||
resp->err = EggsError::TIMEOUT;
|
||||
}
|
||||
}
|
||||
|
||||
// 10ms timeout for prompt termination and for shard resps timeouts
|
||||
int nfds = epoll_wait(_epoll, _events, _socks.size(), 10 /*milliseconds*/);
|
||||
if (nfds < 0) {
|
||||
throw SYSCALL_EXCEPTION("epoll_wait");
|
||||
int ret = poll(_socks.data(), _socks.size(), 10);
|
||||
if (ret < 0) {
|
||||
throw SYSCALL_EXCEPTION("poll");
|
||||
}
|
||||
|
||||
for (int i = 0; i < nfds; i++) {
|
||||
const auto& event = _events[i];
|
||||
if (event.data.u64%2 == 0) {
|
||||
_drainCDCSock(_socks[event.data.u64]);
|
||||
} else {
|
||||
_drainShardSock(_socks[event.data.u64]);
|
||||
// drain sockets
|
||||
// TODO drain shard sockets first, put an upper bound on the number
|
||||
// of accepted packets, so that we prioritize completing outstanding
|
||||
// txns.
|
||||
for (int i = 0; i < _socks.size(); i++) {
|
||||
const auto& pollFd = _socks[i];
|
||||
if (pollFd.events & POLLIN) {
|
||||
if (i%2 == 0) {
|
||||
_drainCDCSock(pollFd.fd);
|
||||
} else {
|
||||
_drainShardSock(pollFd.fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (_cdcReqs.size() > 0 || _shardResps.size() > 0) {
|
||||
// process everything in a single batch
|
||||
_shared.db.update(true, _advanceLogIndex(), _cdcReqs, _shardResps, _step, _cdcReqsTxnIds);
|
||||
// record txn ids etc. for newly received requests
|
||||
for (int i = 0; i < _cdcReqs.size(); i++) {
|
||||
const auto& req = _cdcReqs[i];
|
||||
const auto& reqInfo = _cdcReqsInfo[i];
|
||||
CDCTxnId txnId = _cdcReqsTxnIds[i];
|
||||
ALWAYS_ASSERT(_inFlightTxns.find(txnId) == _inFlightTxns.end());
|
||||
auto& inFlight = _inFlightTxns[txnId];
|
||||
inFlight.hasClient = true;
|
||||
inFlight.cdcRequestId = reqInfo.reqId;
|
||||
inFlight.clientAddr = reqInfo.clientAddr;
|
||||
inFlight.kind = req.kind();
|
||||
inFlight.receivedAt = reqInfo.receivedAt;
|
||||
inFlight.sock = reqInfo.sock;
|
||||
_updateInFlightTxns();
|
||||
_inFlightCDCReqs.insert(InFlightCDCRequestKey(reqInfo.reqId, reqInfo.clientAddr));
|
||||
}
|
||||
_processStep();
|
||||
}
|
||||
}
|
||||
|
||||
virtual void finish() override {
|
||||
@@ -261,17 +339,34 @@ private:
|
||||
return true;
|
||||
}
|
||||
|
||||
// To be called when we have a shard response with given `reqId`.
|
||||
// Searches it in the in flight map, removes it from it, and
|
||||
// adds a CDCShardResp to `_shardResps`.
|
||||
// nullptr if we couldn't find the in flight response. Fills in txnId,
|
||||
// and nothing else.
|
||||
CDCShardResp* _prepareCDCShardResp(uint64_t reqId) {
|
||||
// If it's not the request we wanted, skip
|
||||
auto reqIt = _inFlightShardReqs.find(reqId);
|
||||
if (reqIt == _inFlightShardReqs.end()) {
|
||||
LOG_INFO(_env, "got unexpected shard request id %s, dropping", reqId);
|
||||
return nullptr;
|
||||
}
|
||||
CDCTxnId txnId = reqIt->second.txnId;
|
||||
auto& resp = _shardResps.emplace_back();
|
||||
resp.txnId = reqIt->second.txnId;
|
||||
_inFlightShardReqs.erase(reqIt); // not in flight anymore
|
||||
return &resp;
|
||||
}
|
||||
|
||||
void _initAfterShardsSeen() {
|
||||
// initialize everything after having seen the shards
|
||||
// Create sockets. We create one socket for listening to client requests and one for listening
|
||||
// the the shard's responses. If we have two IPs we do this twice.
|
||||
_socks.resize((_ipPorts[1].ip == 0) ? 2 : 4);
|
||||
for (int i = 0; i < _socks.size(); i++) {
|
||||
if (i > 1 && _ipPorts[1].ip == 0) { // we don't have a second IP
|
||||
_socks[i] = -1;
|
||||
continue;
|
||||
}
|
||||
int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
||||
_socks[i] = sock;
|
||||
_socks[i].fd = sock;
|
||||
_socks[i].events = POLLIN;
|
||||
if (sock < 0) {
|
||||
throw SYSCALL_EXCEPTION("cannot create socket");
|
||||
}
|
||||
@@ -305,32 +400,12 @@ private:
|
||||
LOG_DEBUG(_env, "bound shard %s sock to port %s", i/2, ntohs(addr.sin_port));
|
||||
}
|
||||
}
|
||||
|
||||
// create epoll structure
|
||||
// TODO I did this when I had more sockets, we could just use select now that it's 4
|
||||
// of them...
|
||||
_epoll = epoll_create1(0);
|
||||
if (_epoll < 0) {
|
||||
throw SYSCALL_EXCEPTION("epoll");
|
||||
}
|
||||
for (int i = 0; i < _socks.size(); i++) {
|
||||
if (i > 1 && _ipPorts[1].ip == 0) { // we don't have a second IP
|
||||
break;
|
||||
}
|
||||
auto& event = _events[i];
|
||||
event.data.u64 = i;
|
||||
event.events = EPOLLIN | EPOLLET;
|
||||
if (epoll_ctl(_epoll, EPOLL_CTL_ADD, _socks[i], &event) == -1) {
|
||||
throw SYSCALL_EXCEPTION("epoll_ctl");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
LOG_INFO(_env, "running on ports %s and %s", _shared.ownPorts[0].load(), _shared.ownPorts[1].load());
|
||||
|
||||
// If we've got a dangling transaction, immediately start processing it
|
||||
_shared.db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step, _status);
|
||||
_updateProcessTimings();
|
||||
_processStep(_step);
|
||||
// If we've got dangling transactions, immediately start processing it
|
||||
_shared.db.bootstrap(true, _advanceLogIndex(), _step);
|
||||
_processStep();
|
||||
}
|
||||
|
||||
void _drainCDCSock(int sock) {
|
||||
@@ -381,9 +456,10 @@ private:
|
||||
EggsError err = NO_ERROR;
|
||||
|
||||
// Now, try to parse the body
|
||||
auto& cdcReq = _cdcReqs.emplace_back();
|
||||
try {
|
||||
_cdcReqContainer.unpack(reqBbuf, reqHeader.kind);
|
||||
LOG_DEBUG(_env, "parsed request: %s", _cdcReqContainer);
|
||||
cdcReq.unpack(reqBbuf, reqHeader.kind);
|
||||
LOG_DEBUG(_env, "parsed request: %s", cdcReq);
|
||||
} catch (const BincodeException& exc) {
|
||||
LOG_ERROR(_env, "could not parse: %s", exc.what());
|
||||
RAISE_ALERT(_env, "could not parse CDC request of kind %s from %s, will reply with error.", reqHeader.kind, clientAddr);
|
||||
@@ -397,24 +473,18 @@ private:
|
||||
}
|
||||
|
||||
if (err == NO_ERROR) {
|
||||
// If things went well, process the request
|
||||
LOG_DEBUG(_env, "CDC request %s successfully parsed, will now process", _cdcReqContainer.kind());
|
||||
uint64_t txnId = _shared.db.processCDCReq(true, eggsNow(), _advanceLogIndex(), _cdcReqContainer, _step, _status);
|
||||
_updateProcessTimings();
|
||||
auto& inFlight = _inFlightTxns[txnId];
|
||||
inFlight.cdcRequestId = reqHeader.requestId;
|
||||
inFlight.clientAddr = clientAddr;
|
||||
inFlight.kind = reqHeader.kind;
|
||||
inFlight.sock = sock;
|
||||
inFlight.receivedAt = receivedAt;
|
||||
_updateInFlightTxns();
|
||||
_inFlightCDCReqs.insert(InFlightCDCRequestKey(reqHeader.requestId, clientAddr));
|
||||
// Go forward
|
||||
_processStep(_step);
|
||||
LOG_DEBUG(_env, "CDC request %s successfully parsed, will process soon", cdcReq.kind());
|
||||
_cdcReqsInfo.emplace_back(CDCReqInfo{
|
||||
.reqId = reqHeader.requestId,
|
||||
.clientAddr = clientAddr,
|
||||
.receivedAt = receivedAt,
|
||||
.sock = sock,
|
||||
});
|
||||
} else {
|
||||
// Otherwise we can immediately reply with an error
|
||||
RAISE_ALERT(_env, "request %s failed before enqueue with error %s", _cdcReqContainer.kind(), err);
|
||||
// We couldn't parse, reply immediately with an error
|
||||
RAISE_ALERT(_env, "request %s failed before enqueue with error %s", cdcReq.kind(), err);
|
||||
_sendError(sock, reqHeader.requestId, err, clientAddr);
|
||||
_cdcReqs.pop_back(); // let's just forget all about this
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -448,100 +518,90 @@ private:
|
||||
// control all the code in this codebase, and the header is good, and we're a
|
||||
// bit lazy.
|
||||
|
||||
// If it's not the request we wanted, skip
|
||||
if (!_inFlightShardReq) {
|
||||
LOG_INFO(_env, "got unexpected shard request id %s, kind %s, from shard, dropping", respHeader.requestId, respHeader.kind);
|
||||
auto shardResp = _prepareCDCShardResp(respHeader.requestId);
|
||||
if (shardResp == nullptr) {
|
||||
// we couldn't find it
|
||||
continue;
|
||||
}
|
||||
if (_inFlightShardReq->shardRequestId != respHeader.requestId) {
|
||||
LOG_INFO(_env, "got unexpected shard request id %s (expected %s), kind %s, from shard %s, dropping", respHeader.requestId, _inFlightShardReq->shardRequestId, respHeader.kind, _inFlightShardReq->shid);
|
||||
continue;
|
||||
}
|
||||
uint64_t txnId = _inFlightShardReq->txnId;
|
||||
|
||||
// We can forget about this, we're going to process it right now
|
||||
_inFlightShardReq.reset();
|
||||
|
||||
// We got an error
|
||||
if (respHeader.kind == (ShardMessageKind)0) {
|
||||
EggsError err = reqBbuf.unpackScalar<EggsError>();
|
||||
_handleShardError(_inFlightShardReq->shid, err);
|
||||
shardResp->err = reqBbuf.unpackScalar<EggsError>();
|
||||
LOG_DEBUG(_env, "got error %s for response id %s", shardResp->err, respHeader.requestId);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Otherwise, parse the body
|
||||
_shardRespContainer.unpack(reqBbuf, respHeader.kind);
|
||||
LOG_DEBUG(_env, "parsed shard response: %s", _shardRespContainer);
|
||||
shardResp->resp.unpack(reqBbuf, respHeader.kind);
|
||||
LOG_DEBUG(_env, "parsed shard response: %s", shardResp->resp);
|
||||
ALWAYS_ASSERT(reqBbuf.remaining() == 0);
|
||||
|
||||
// If all went well, advance with the newly received request
|
||||
LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, will now process", respHeader.requestId, respHeader.kind);
|
||||
_shared.db.processShardResp(true, eggsNow(), _advanceLogIndex(), NO_ERROR, &_shardRespContainer, _step, _status);
|
||||
_updateProcessTimings();
|
||||
_processStep(_step);
|
||||
LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, process soon", respHeader.requestId, respHeader.kind);
|
||||
}
|
||||
}
|
||||
|
||||
void _handleShardError(ShardId shid, EggsError err) {
|
||||
if (innocuousShardError(err)) {
|
||||
LOG_DEBUG(_env, "got innocuous shard error %s from shard %s", err, shid);
|
||||
} else if (rareInnocuousShardError(err)) {
|
||||
LOG_INFO(_env, "got rare innocuous shard error %s from shard %s", err, shid);
|
||||
} else {
|
||||
RAISE_ALERT(_env, "got shard error %s from shard %s", err, shid);
|
||||
}
|
||||
_shared.db.processShardResp(true, eggsNow(), _advanceLogIndex(), err, nullptr, _step, _status);
|
||||
_updateProcessTimings();
|
||||
_processStep(_step);
|
||||
}
|
||||
|
||||
void _processStep(const CDCStep& step) {
|
||||
LOG_DEBUG(_env, "processing step %s", step);
|
||||
if (step.txnFinished != 0) {
|
||||
LOG_DEBUG(_env, "txn %s finished", step.txnFinished);
|
||||
void _processStep() {
|
||||
LOG_DEBUG(_env, "processing step %s", _step);
|
||||
// finished txns
|
||||
for (const auto& [txnId, resp]: _step.finishedTxns) {
|
||||
LOG_DEBUG(_env, "txn %s finished", txnId);
|
||||
// we need to send the response back to the client
|
||||
auto inFlight = _inFlightTxns.find(step.txnFinished);
|
||||
if (inFlight == _inFlightTxns.end()) {
|
||||
LOG_INFO(_env, "Could not find in-flight request %s, this might be because the CDC was restarted in the middle of a transaction.", step.txnFinished);
|
||||
} else {
|
||||
auto inFlight = _inFlightTxns.find(txnId);
|
||||
if (inFlight->second.hasClient) {
|
||||
_shared.timingsTotal[(int)inFlight->second.kind].add(eggsNow() - inFlight->second.receivedAt);
|
||||
_shared.errors[(int)inFlight->second.kind].add(_step.err);
|
||||
if (step.err != NO_ERROR) {
|
||||
if (rareInnocuousShardError(step.err)) {
|
||||
LOG_INFO(_env, "txn %s, req id %s, finished with rare innocuous error %s", step.txnFinished, inFlight->second.cdcRequestId, step.err);
|
||||
} else if (!innocuousShardError(step.err)) {
|
||||
RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", step.txnFinished, inFlight->second.cdcRequestId, step.err);
|
||||
_shared.errors[(int)inFlight->second.kind].add(resp.err);
|
||||
if (resp.err != NO_ERROR) {
|
||||
if (innocuousShardError(resp.err)) {
|
||||
LOG_INFO(_env, "txn %s, req id %s, finished with innocuous error %s", txnId, inFlight->second.cdcRequestId, resp.err);
|
||||
} else if (rareInnocuousShardError(resp.err)) {
|
||||
LOG_INFO(_env, "txn %s, req id %s, finished with rare innocuous error %s", txnId, inFlight->second.cdcRequestId, resp.err);
|
||||
} else {
|
||||
RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", txnId, inFlight->second.cdcRequestId, resp.err);
|
||||
}
|
||||
_sendError(inFlight->second.sock, inFlight->second.cdcRequestId, step.err, inFlight->second.clientAddr);
|
||||
_sendError(inFlight->second.sock, inFlight->second.cdcRequestId, resp.err, inFlight->second.clientAddr);
|
||||
} else {
|
||||
LOG_DEBUG(_env, "sending response with req id %s, kind %s, back to %s", inFlight->second.cdcRequestId, inFlight->second.kind, inFlight->second.clientAddr);
|
||||
BincodeBuf bbuf(&_sendBuf[0], _sendBuf.size());
|
||||
CDCResponseHeader respHeader(inFlight->second.cdcRequestId, inFlight->second.kind);
|
||||
respHeader.pack(bbuf);
|
||||
step.resp.pack(bbuf);
|
||||
resp.resp.pack(bbuf);
|
||||
_send(inFlight->second.sock, inFlight->second.clientAddr, (const char*)bbuf.data, bbuf.len());
|
||||
}
|
||||
_inFlightCDCReqs.erase(InFlightCDCRequestKey(inFlight->second.cdcRequestId, inFlight->second.clientAddr));
|
||||
_inFlightTxns.erase(inFlight);
|
||||
_updateInFlightTxns();
|
||||
}
|
||||
_inFlightTxns.erase(inFlight);
|
||||
_updateInFlightTxns();
|
||||
}
|
||||
if (step.txnNeedsShard != 0) {
|
||||
// in flight txns
|
||||
for (const auto& [txnId, shardReq]: _step.runningTxns) {
|
||||
CDCShardReq prevReq;
|
||||
LOG_TRACE(_env, "txn %s needs shard %s, req %s", step.txnNeedsShard, step.shardReq.shid, step.shardReq.req);
|
||||
LOG_TRACE(_env, "txn %s needs shard %s, req %s", txnId, shardReq.shid, shardReq.req);
|
||||
BincodeBuf bbuf(&_sendBuf[0], _sendBuf.size());
|
||||
// Header
|
||||
ShardRequestHeader shardReqHeader;
|
||||
// Do not allocate new req id for repeated requests, so that we'll just accept
|
||||
// the first one that comes back.
|
||||
if (!step.shardReq.repeated) {
|
||||
// the first one that comes back. There's a chance for the txnId to not be here
|
||||
// yet: if we have just restarted the CDC. In this case we fill it in here, but
|
||||
// obviously without client addr.
|
||||
auto inFlightTxn = _inFlightTxns.find(txnId);
|
||||
if (inFlightTxn == _inFlightTxns.end()) {
|
||||
LOG_INFO(_env, "Could not find in-flight transaction %s, this might be because the CDC was restarted in the middle of a transaction.", txnId);
|
||||
InFlightCDCRequest req;
|
||||
req.hasClient = false;
|
||||
req.lastSentRequestId = _freshShardReqId();
|
||||
inFlightTxn = _inFlightTxns.emplace(txnId, req).first;
|
||||
_updateInFlightTxns();
|
||||
} else if (shardReq.repeated) {
|
||||
shardReqHeader.requestId = inFlightTxn->second.lastSentRequestId;
|
||||
} else {
|
||||
_shardRequestIdCounter++;
|
||||
shardReqHeader.requestId = _shardRequestIdCounter;
|
||||
}
|
||||
shardReqHeader.requestId = _shardRequestIdCounter;
|
||||
shardReqHeader.kind = step.shardReq.req.kind();
|
||||
shardReqHeader.kind = shardReq.req.kind();
|
||||
shardReqHeader.pack(bbuf);
|
||||
// Body
|
||||
step.shardReq.req.pack(bbuf);
|
||||
shardReq.req.pack(bbuf);
|
||||
// MAC, if necessary
|
||||
if (isPrivilegedRequestKind(shardReqHeader.kind)) {
|
||||
bbuf.packFixedBytes<8>({cbcmac(_expandedCDCKey, bbuf.data, bbuf.len())});
|
||||
@@ -550,7 +610,7 @@ private:
|
||||
struct sockaddr_in shardAddr;
|
||||
memset(&shardAddr, 0, sizeof(shardAddr));
|
||||
_shared.shardsMutex.lock();
|
||||
ShardInfo shardInfo = _shared.shards[step.shardReq.shid.u8];
|
||||
ShardInfo shardInfo = _shared.shards[shardReq.shid.u8];
|
||||
_shared.shardsMutex.unlock();
|
||||
auto now = eggsNow(); // randomly pick one of the shard addrs and one of our sockets
|
||||
int whichShardAddr = now.ns & !!shardInfo.port2;
|
||||
@@ -559,21 +619,15 @@ private:
|
||||
shardAddr.sin_port = htons(whichShardAddr ? shardInfo.port2 : shardInfo.port1);
|
||||
static_assert(sizeof(shardAddr.sin_addr) == sizeof(shardInfo.ip1));
|
||||
memcpy(&shardAddr.sin_addr, (whichShardAddr ? shardInfo.ip2 : shardInfo.ip1).data.data(), sizeof(shardAddr.sin_addr));
|
||||
LOG_DEBUG(_env, "sending request with req id %s to shard %s (%s)", shardReqHeader.requestId, step.shardReq.shid, shardAddr);
|
||||
_send(_socks[whichSock*2 + 1], shardAddr, (const char*)bbuf.data, bbuf.len());
|
||||
LOG_DEBUG(_env, "sending request for txn %s with req id %s to shard %s (%s)", txnId, shardReqHeader.requestId, shardReq.shid, shardAddr);
|
||||
_send(_socks[whichSock*2 + 1].fd, shardAddr, (const char*)bbuf.data, bbuf.len());
|
||||
// Record the in-flight req
|
||||
ALWAYS_ASSERT(!_inFlightShardReq);
|
||||
auto& inFlight = _inFlightShardReq.emplace();
|
||||
inFlight.shardRequestId = shardReqHeader.requestId;
|
||||
inFlight.sentAt = now;
|
||||
inFlight.txnId = step.txnNeedsShard;
|
||||
inFlight.shid = step.shardReq.shid;
|
||||
}
|
||||
if (step.nextTxn != 0) {
|
||||
LOG_DEBUG(_env, "we have txn %s lined up, starting it", step.nextTxn);
|
||||
_shared.db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step, _status);
|
||||
_updateProcessTimings();
|
||||
_processStep(_step);
|
||||
_inFlightShardReqs.insert(shardReqHeader.requestId, InFlightShardRequest{
|
||||
.txnId = txnId,
|
||||
.sentAt = now,
|
||||
.shid = shardReq.shid,
|
||||
});
|
||||
inFlightTxn->second.lastSentRequestId = shardReqHeader.requestId;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -730,11 +784,6 @@ public:
|
||||
_shared.timingsTotal[(int)kind].toStats(prefix.str(), _stats);
|
||||
_shared.errors[(int)kind].toStats(prefix.str(), _stats);
|
||||
}
|
||||
{
|
||||
std::ostringstream prefix;
|
||||
prefix << "cdc." << kind << ".process";
|
||||
_shared.timingsProcess[(int)kind].toStats(prefix.str(), _stats);
|
||||
}
|
||||
}
|
||||
err = insertStats(_shuckleHost, _shucklePort, 10_sec, _stats);
|
||||
_stats.clear();
|
||||
@@ -743,7 +792,6 @@ public:
|
||||
for (CDCMessageKind kind : allCDCMessageKind) {
|
||||
_shared.timingsTotal[(int)kind].reset();
|
||||
_shared.errors[(int)kind].reset();
|
||||
_shared.timingsProcess[(int)kind].reset();
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
#include <memory>
|
||||
#include <rocksdb/db.h>
|
||||
#include <rocksdb/iterator.h>
|
||||
#include <rocksdb/options.h>
|
||||
#include <rocksdb/statistics.h>
|
||||
#include <rocksdb/status.h>
|
||||
#include <rocksdb/utilities/transaction.h>
|
||||
#include <rocksdb/utilities/optimistic_transaction_db.h>
|
||||
#include <fstream>
|
||||
#include <unordered_set>
|
||||
|
||||
#include "Assert.hpp"
|
||||
#include "Bincode.hpp"
|
||||
@@ -15,6 +17,7 @@
|
||||
#include "Env.hpp"
|
||||
#include "Exception.hpp"
|
||||
#include "Msgs.hpp"
|
||||
#include "MsgsGen.hpp"
|
||||
#include "RocksDBUtils.hpp"
|
||||
#include "ShardDB.hpp"
|
||||
#include "Time.hpp"
|
||||
@@ -92,25 +95,48 @@ std::ostream& operator<<(std::ostream& out, const CDCShardReq& x) {
|
||||
return out;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const CDCFinished& x) {
|
||||
out << "CDCFinished(";
|
||||
if (x.err != NO_ERROR) {
|
||||
out << "err=" << x.err;
|
||||
} else {
|
||||
out << "resp=" << x.resp;
|
||||
}
|
||||
out << ")";
|
||||
return out;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const CDCStep& x) {
|
||||
ALWAYS_ASSERT(!(x.txnFinished != 0 && x.txnNeedsShard != 0));
|
||||
out << "CDCStep(";
|
||||
if (x.txnFinished != 0) {
|
||||
out << "finishedTxn=" << x.txnFinished;
|
||||
if (x.err != NO_ERROR) {
|
||||
out << ", err=" << x.err;
|
||||
} else {
|
||||
out << ", resp=" << x.resp;
|
||||
}
|
||||
}
|
||||
if (x.txnNeedsShard != 0) {
|
||||
out << "txnNeedsShard=" << x.txnNeedsShard << ", shardReq=" << x.shardReq;
|
||||
}
|
||||
if (x.nextTxn != 0) {
|
||||
if (x.txnFinished != 0 || x.txnNeedsShard != 0) {
|
||||
out << "CDCStep(finishedTxns=[";
|
||||
for (int i = 0; i < x.finishedTxns.size(); i++) {
|
||||
if (i > 0) {
|
||||
out << ", ";
|
||||
}
|
||||
out << "nextTxn=" << x.nextTxn;
|
||||
const auto& txn = x.finishedTxns[i];
|
||||
out << "<" << txn.first << ", " << txn.second << ">";
|
||||
}
|
||||
out << "], runningTxns=[";
|
||||
for (int i = 0; i < x.runningTxns.size(); i++) {
|
||||
if (i > 0) {
|
||||
out << ", ";
|
||||
}
|
||||
const auto& txn = x.runningTxns[i];
|
||||
out << "<" << txn.first << ", " << txn.second << ">";
|
||||
}
|
||||
out << "])";
|
||||
return out;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, CDCTxnId id) {
|
||||
return out << id.x;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const CDCShardResp& x) {
|
||||
out << "CDCShardResp(txnId=" << x.txnId << ", ";
|
||||
if (x.err == NO_ERROR) {
|
||||
out << "resp=" << x.resp;
|
||||
} else {
|
||||
out << "err=" << x.err;
|
||||
}
|
||||
out << ")";
|
||||
return out;
|
||||
@@ -122,26 +148,71 @@ inline bool createCurrentLockedEdgeRetry(EggsError err) {
|
||||
err == EggsError::MORE_RECENT_SNAPSHOT_EDGE || err == EggsError::MORE_RECENT_CURRENT_EDGE;
|
||||
}
|
||||
|
||||
static constexpr InodeId MOVE_DIRECTORY_LOCK = InodeId::FromU64Unchecked(1ull<<63);
|
||||
|
||||
// These are all the directories where we'll lock edges given a request.
|
||||
// These function _must be pure_! We call it repeatedly as if it's a property
|
||||
// of the request more than a function.
|
||||
//
|
||||
// Technically every well form request will have distinct inode ids, but there
|
||||
// are parts in the code where this function is called before we know that the
|
||||
// request is valid.
|
||||
static std::unordered_set<InodeId> directoriesNeedingLock(const CDCReqContainer& req) {
|
||||
std::unordered_set<InodeId> toLock;
|
||||
switch (req.kind()) {
|
||||
case CDCMessageKind::MAKE_DIRECTORY:
|
||||
toLock.emplace(req.getMakeDirectory().ownerId);
|
||||
break;
|
||||
case CDCMessageKind::RENAME_FILE:
|
||||
toLock.emplace(req.getRenameFile().oldOwnerId);
|
||||
toLock.emplace(req.getRenameFile().newOwnerId);
|
||||
break;
|
||||
case CDCMessageKind::SOFT_UNLINK_DIRECTORY:
|
||||
// TODO I'm pretty sure the target id is fine, as
|
||||
// in, does not need locking.
|
||||
toLock.emplace(req.getSoftUnlinkDirectory().ownerId);
|
||||
break;
|
||||
case CDCMessageKind::RENAME_DIRECTORY:
|
||||
toLock.emplace(req.getRenameDirectory().oldOwnerId);
|
||||
toLock.emplace(req.getRenameDirectory().newOwnerId);
|
||||
// Moving directories is special: it can introduce loops if we're not careful.
|
||||
// Instead of trying to not create loops in the context of interleaved transactions,
|
||||
// we instead only allow one move directory at a time.
|
||||
toLock.emplace(MOVE_DIRECTORY_LOCK);
|
||||
break;
|
||||
case CDCMessageKind::HARD_UNLINK_DIRECTORY:
|
||||
toLock.emplace(req.getHardUnlinkDirectory().dirId);
|
||||
break;
|
||||
case CDCMessageKind::CROSS_SHARD_HARD_UNLINK_FILE:
|
||||
toLock.emplace(req.getCrossShardHardUnlinkFile().ownerId);
|
||||
break;
|
||||
case CDCMessageKind::ERROR:
|
||||
throw EGGS_EXCEPTION("bad req type error");
|
||||
default:
|
||||
throw EGGS_EXCEPTION("bad req type %s", (uint8_t)req.kind());
|
||||
}
|
||||
return toLock;
|
||||
}
|
||||
|
||||
struct StateMachineEnv {
|
||||
Env& env;
|
||||
rocksdb::OptimisticTransactionDB* db;
|
||||
rocksdb::ColumnFamilyHandle* defaultCf;
|
||||
rocksdb::ColumnFamilyHandle* parentCf;
|
||||
rocksdb::Transaction& dbTxn;
|
||||
EggsTime time;
|
||||
uint64_t txnId;
|
||||
CDCTxnId txnId;
|
||||
uint8_t txnStep;
|
||||
CDCStep& cdcStep;
|
||||
bool finished;
|
||||
|
||||
StateMachineEnv(
|
||||
Env& env_, rocksdb::OptimisticTransactionDB* db_, rocksdb::ColumnFamilyHandle* defaultCf_, rocksdb::ColumnFamilyHandle* parentCf_, rocksdb::Transaction& dbTxn_, EggsTime time_, uint64_t txnId_, uint8_t step_, CDCStep& cdcStep_
|
||||
Env& env_, rocksdb::ColumnFamilyHandle* defaultCf_, rocksdb::ColumnFamilyHandle* parentCf_, rocksdb::Transaction& dbTxn_, CDCTxnId txnId_, uint8_t step_, CDCStep& cdcStep_
|
||||
):
|
||||
env(env_), db(db_), defaultCf(defaultCf_), parentCf(parentCf_), dbTxn(dbTxn_), time(time_), txnId(txnId_), txnStep(step_), cdcStep(cdcStep_)
|
||||
env(env_), defaultCf(defaultCf_), parentCf(parentCf_), dbTxn(dbTxn_), txnId(txnId_), txnStep(step_), cdcStep(cdcStep_), finished(false)
|
||||
{}
|
||||
|
||||
InodeId nextDirectoryId() {
|
||||
InodeId nextDirectoryId(rocksdb::Transaction& dbTxn) {
|
||||
std::string v;
|
||||
ROCKS_DB_CHECKED(db->Get({}, defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), &v));
|
||||
ROCKS_DB_CHECKED(dbTxn.Get({}, defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), &v));
|
||||
ExternalValue<InodeIdValue> nextId(v);
|
||||
InodeId id = nextId().id();
|
||||
nextId().setId(InodeId::FromU64(id.u64 + 1));
|
||||
@@ -151,24 +222,27 @@ struct StateMachineEnv {
|
||||
|
||||
ShardReqContainer& needsShard(uint8_t step, ShardId shid, bool repeated) {
|
||||
txnStep = step;
|
||||
cdcStep.txnFinished = 0;
|
||||
cdcStep.txnNeedsShard = txnId;
|
||||
cdcStep.shardReq.shid = shid;
|
||||
cdcStep.shardReq.repeated = repeated;
|
||||
return cdcStep.shardReq.req;
|
||||
auto& running = cdcStep.runningTxns.emplace_back();
|
||||
running.first = txnId;
|
||||
running.second.shid = shid;
|
||||
running.second.repeated = repeated;
|
||||
return running.second.req;
|
||||
}
|
||||
|
||||
CDCRespContainer& finish() {
|
||||
cdcStep.txnFinished = txnId;
|
||||
cdcStep.err = NO_ERROR;
|
||||
return cdcStep.resp;
|
||||
this->finished = true;
|
||||
auto& finished = cdcStep.finishedTxns.emplace_back();
|
||||
finished.first = txnId;
|
||||
finished.second.err = NO_ERROR;
|
||||
return finished.second.resp;
|
||||
}
|
||||
|
||||
void finishWithError(EggsError err) {
|
||||
this->finished = true;
|
||||
ALWAYS_ASSERT(err != NO_ERROR);
|
||||
cdcStep.txnFinished = txnId;
|
||||
cdcStep.err = err;
|
||||
cdcStep.txnNeedsShard = 0;
|
||||
auto& errored = cdcStep.finishedTxns.emplace_back();
|
||||
errored.first = txnId;
|
||||
errored.second.err = err;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -234,7 +308,7 @@ struct MakeDirectoryStateMachine {
|
||||
}
|
||||
|
||||
void start() {
|
||||
state.setDirId(env.nextDirectoryId());
|
||||
state.setDirId(env.nextDirectoryId(env.dbTxn));
|
||||
lookup();
|
||||
}
|
||||
|
||||
@@ -1217,23 +1291,33 @@ struct UnpackCDCReq {
|
||||
struct CDCDBImpl {
|
||||
Env _env;
|
||||
|
||||
// TODO it would be good to store basically all of the metadata in memory,
|
||||
// so that we'd just read from it, but this requires a bit of care when writing
|
||||
// if we want to be able to not write the batch at the end.
|
||||
// The reason why we insist in storing everything in RocksDB is that we can do
|
||||
// everything in a single transaction, so it's easier to reason about atomic
|
||||
// modifications. _dirsToTxnsCf for example would be much simpler as a
|
||||
// simple unordered_map.
|
||||
//
|
||||
// It also has the nice advantage that we don't need to reconstruct the state
|
||||
// when starting up, it's all already there.
|
||||
|
||||
rocksdb::OptimisticTransactionDB* _db;
|
||||
rocksdb::ColumnFamilyHandle* _defaultCf;
|
||||
rocksdb::ColumnFamilyHandle* _reqQueueCf;
|
||||
rocksdb::ColumnFamilyHandle* _parentCf;
|
||||
rocksdb::ColumnFamilyHandle* _enqueuedCf; // V1, txnId -> CDC req, only for executing or waiting to be executed requests
|
||||
rocksdb::ColumnFamilyHandle* _executingCf; // V1, txnId -> CDC state machine, for requests that are executing
|
||||
// V1, data structure storing a dir to txn ids mapping:
|
||||
// InodeId -> txnId -- sentinel telling us what the first txn in line is. If none, zero.
|
||||
// we need the sentinel to skip over tombstones quickly.
|
||||
// InodeId, txnId set with the queue
|
||||
rocksdb::ColumnFamilyHandle* _dirsToTxnsCf;
|
||||
// rocksdb::ColumnFamilyHandle* _dirsToTxnsCf; // V1, InodeId, txnId set
|
||||
// legacy
|
||||
rocksdb::ColumnFamilyHandle* _reqQueueCfLegacy; // V0, txnId -> CDC req, for all the requests (including historical)
|
||||
|
||||
AssertiveLock _processLock;
|
||||
|
||||
std::shared_ptr<rocksdb::Statistics> _dbStatistics;
|
||||
std::string _dbStatisticsFile;
|
||||
|
||||
// Used to deserialize into
|
||||
CDCReqContainer _cdcReq;
|
||||
|
||||
// ----------------------------------------------------------------
|
||||
// initialization
|
||||
|
||||
@@ -1256,6 +1340,9 @@ struct CDCDBImpl {
|
||||
{rocksdb::kDefaultColumnFamilyName, {}},
|
||||
{"reqQueue", {}},
|
||||
{"parent", {}},
|
||||
{"enqueued", {}},
|
||||
{"executing", {}},
|
||||
{"dirsToTxns", {}},
|
||||
};
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> familiesHandles;
|
||||
auto dbPath = path + "/db";
|
||||
@@ -1266,8 +1353,11 @@ struct CDCDBImpl {
|
||||
);
|
||||
ALWAYS_ASSERT(familiesDescriptors.size() == familiesHandles.size());
|
||||
_defaultCf = familiesHandles[0];
|
||||
_reqQueueCf = familiesHandles[1];
|
||||
_reqQueueCfLegacy = familiesHandles[1];
|
||||
_parentCf = familiesHandles[2];
|
||||
_enqueuedCf = familiesHandles[3];
|
||||
_executingCf = familiesHandles[4];
|
||||
_dirsToTxnsCf = familiesHandles[5];
|
||||
|
||||
_initDb();
|
||||
}
|
||||
@@ -1276,8 +1366,11 @@ struct CDCDBImpl {
|
||||
LOG_INFO(_env, "destroying column families and closing database");
|
||||
|
||||
ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_defaultCf));
|
||||
ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_reqQueueCf));
|
||||
ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_reqQueueCfLegacy));
|
||||
ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_parentCf));
|
||||
ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_enqueuedCf));
|
||||
ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_executingCf));
|
||||
ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_dirsToTxnsCf));
|
||||
|
||||
ROCKS_DB_CHECKED(_db->Close());
|
||||
}
|
||||
@@ -1301,16 +1394,33 @@ struct CDCDBImpl {
|
||||
}
|
||||
|
||||
TXN_ID_SETTER_GETTER(LAST_TXN_KEY, _lastTxn, _setLastTxn)
|
||||
TXN_ID_SETTER_GETTER(FIRST_TXN_IN_QUEUE_KEY, _firstTxnInQueue, _setFirstTxnInQueue)
|
||||
TXN_ID_SETTER_GETTER(LAST_TXN_IN_QUEUE_KEY, _lastTxnInQueue, _setLastTxnInQueue)
|
||||
TXN_ID_SETTER_GETTER(EXECUTING_TXN_KEY, _executingTxn, _setExecutingTxn)
|
||||
TXN_ID_SETTER_GETTER(EXECUTING_TXN_KEY, _executingTxnLegacy, _setExecutingTxnLegacy)
|
||||
|
||||
#undef TXN_ID_SETTER_GETTER
|
||||
|
||||
void _initDb() {
|
||||
const auto keyExists = [this](rocksdb::ColumnFamilyHandle* cf, const rocksdb::Slice& key) -> bool {
|
||||
// returns -1 if the version key was not set
|
||||
int64_t _version(rocksdb::Transaction& dbTxn) {
|
||||
std::string value;
|
||||
auto status = dbTxn.Get({}, _defaultCf, cdcMetadataKey(&VERSION_KEY), &value);
|
||||
if (status.IsNotFound()) {
|
||||
return -1;
|
||||
} else {
|
||||
ROCKS_DB_CHECKED(status);
|
||||
ExternalValue<U64Value> vV(value);
|
||||
return vV().u64();
|
||||
}
|
||||
}
|
||||
|
||||
void _setVersion(rocksdb::Transaction& dbTxn, uint64_t version) {
|
||||
auto v = U64Value::Static(version);
|
||||
ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&VERSION_KEY), v.toSlice()));
|
||||
}
|
||||
|
||||
void _initDbV0(rocksdb::Transaction& dbTxn) {
|
||||
LOG_INFO(_env, "initializing V0 db");
|
||||
const auto keyExists = [&dbTxn](rocksdb::ColumnFamilyHandle* cf, const rocksdb::Slice& key) -> bool {
|
||||
std::string value;
|
||||
auto status = _db->Get({}, cf, key, &value);
|
||||
auto status = dbTxn.Get({}, cf, key, &value);
|
||||
if (status.IsNotFound()) {
|
||||
return false;
|
||||
} else {
|
||||
@@ -1322,15 +1432,15 @@ struct CDCDBImpl {
|
||||
if (!keyExists(_defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY))) {
|
||||
LOG_INFO(_env, "initializing next directory id");
|
||||
auto id = InodeIdValue::Static(InodeId::FromU64(ROOT_DIR_INODE_ID.u64 + 1));
|
||||
ROCKS_DB_CHECKED(_db->Put({}, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), id.toSlice()));
|
||||
ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), id.toSlice()));
|
||||
}
|
||||
|
||||
const auto initZeroValue = [this, &keyExists](const std::string& what, const CDCMetadataKey& key) {
|
||||
const auto initZeroValue = [this, &keyExists, &dbTxn](const std::string& what, const CDCMetadataKey& key) {
|
||||
if (!keyExists(_defaultCf, cdcMetadataKey(&key))) {
|
||||
LOG_INFO(_env, "initializing %s", what);
|
||||
StaticValue<U64Value> v;
|
||||
v().setU64(0);
|
||||
ROCKS_DB_CHECKED(_db->Put({}, cdcMetadataKey(&key), v.toSlice()));
|
||||
ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&key), v.toSlice()));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1341,6 +1451,61 @@ struct CDCDBImpl {
|
||||
initZeroValue("last applied log index", LAST_APPLIED_LOG_ENTRY_KEY);
|
||||
}
|
||||
|
||||
void _initDbV1(rocksdb::Transaction& dbTxn) {
|
||||
LOG_INFO(_env, "initializing V1 db");
|
||||
// Pick up the executing txn, if any, and move it to the executing CF.
|
||||
// We preserve the executing txn to preserve integrity.
|
||||
{
|
||||
uint64_t executingTxn = _executingTxnLegacy(dbTxn);
|
||||
if (executingTxn != 0) {
|
||||
LOG_INFO(_env, "migrating txn %s", executingTxn);
|
||||
// _reqQueueCf -> _enqueuedCf
|
||||
auto txnK = CDCTxnIdKey::Static(CDCTxnId(executingTxn));
|
||||
std::string reqV;
|
||||
ROCKS_DB_CHECKED(dbTxn.Get({}, _reqQueueCfLegacy, txnK.toSlice(), &reqV));
|
||||
CDCReqContainer req;
|
||||
UnpackCDCReq ureq(req);
|
||||
bincodeFromRocksValue(reqV, ureq);
|
||||
ROCKS_DB_CHECKED(dbTxn.Put(_enqueuedCf, txnK.toSlice(), reqV));
|
||||
// EXECUTING_TXN_KEY -> _executingCf
|
||||
std::string txnStateV;
|
||||
ROCKS_DB_CHECKED(dbTxn.Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV));
|
||||
ROCKS_DB_CHECKED(dbTxn.Put(_executingCf, txnK.toSlice(), txnStateV));
|
||||
// Add to _dirsToTxnsCf, will lock since things are empty
|
||||
_addToDirsToTxns(dbTxn, txnK().id(), req);
|
||||
}
|
||||
}
|
||||
|
||||
// Throw away everything legacy. The clients will just retry.
|
||||
ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&FIRST_TXN_IN_QUEUE_KEY)));
|
||||
ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&LAST_TXN_IN_QUEUE_KEY)));
|
||||
ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&EXECUTING_TXN_KEY)));
|
||||
ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&EXECUTING_TXN_STATE_KEY)));
|
||||
// We could delete the CF itself, but let's do everything in the same transaction
|
||||
std::unique_ptr<rocksdb::Iterator> it(dbTxn.GetIterator({}, _reqQueueCfLegacy));
|
||||
for (it->Seek(""); it->Valid(); it->Next()) {
|
||||
ROCKS_DB_CHECKED(dbTxn.Delete(_reqQueueCfLegacy, it->key()));
|
||||
}
|
||||
ROCKS_DB_CHECKED(it->status());
|
||||
}
|
||||
|
||||
void _initDb() {
|
||||
rocksdb::WriteOptions options;
|
||||
options.sync = true;
|
||||
std::unique_ptr<rocksdb::Transaction> dbTxn(_db->BeginTransaction(options));
|
||||
|
||||
if (_version(*dbTxn), -1) {
|
||||
_initDbV0(*dbTxn);
|
||||
_setVersion(*dbTxn, 0);
|
||||
}
|
||||
if (_version(*dbTxn) == 0) {
|
||||
_initDbV1(*dbTxn);
|
||||
_setVersion(*dbTxn, 1);
|
||||
}
|
||||
|
||||
commitTransaction(*dbTxn);
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------
|
||||
// retrying txns
|
||||
|
||||
@@ -1375,15 +1540,22 @@ struct CDCDBImpl {
|
||||
// Processing
|
||||
// ----------------------------------------------------------------
|
||||
|
||||
uint64_t _lastAppliedLogEntry() {
|
||||
uint64_t _lastAppliedLogEntryDB() {
|
||||
std::string value;
|
||||
ROCKS_DB_CHECKED(_db->Get({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), &value));
|
||||
ExternalValue<U64Value> v(value);
|
||||
return v().u64();
|
||||
}
|
||||
|
||||
uint64_t _lastAppliedLogEntry(rocksdb::Transaction& dbTxn) {
|
||||
std::string value;
|
||||
ROCKS_DB_CHECKED(dbTxn.Get({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), &value));
|
||||
ExternalValue<U64Value> v(value);
|
||||
return v().u64();
|
||||
}
|
||||
|
||||
void _advanceLastAppliedLogEntry(rocksdb::Transaction& dbTxn, uint64_t index) {
|
||||
uint64_t oldIndex = _lastAppliedLogEntry();
|
||||
uint64_t oldIndex = _lastAppliedLogEntry(dbTxn);
|
||||
ALWAYS_ASSERT(oldIndex+1 == index, "old index is %s, expected %s, got %s", oldIndex, oldIndex+1, index);
|
||||
LOG_DEBUG(_env, "bumping log index from %s to %s", oldIndex, index);
|
||||
StaticValue<U64Value> v;
|
||||
@@ -1391,80 +1563,131 @@ struct CDCDBImpl {
|
||||
ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), v.toSlice()));
|
||||
}
|
||||
|
||||
// Pushes a new request onto the queue.
|
||||
void _reqQueuePush(rocksdb::Transaction& dbTxn, uint64_t txnId, const CDCReqContainer& req) {
|
||||
// Check metadata
|
||||
uint64_t last = _lastTxnInQueue(dbTxn);
|
||||
ALWAYS_ASSERT(last == 0 || txnId == last + 1);
|
||||
// Write to queue
|
||||
{
|
||||
auto k = U64Key::Static(txnId);
|
||||
std::string v = bincodeToRocksValue(PackCDCReq(req));
|
||||
ROCKS_DB_CHECKED(dbTxn.Put(_reqQueueCf, k.toSlice(), v));
|
||||
void _addToDirsToTxns(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) {
|
||||
for (const auto dirId: directoriesNeedingLock(req)) {
|
||||
LOG_DEBUG(_env, "adding dir %s for txn %s", dirId, txnId);
|
||||
{
|
||||
// into the set
|
||||
StaticValue<DirsToTxnsKey> k;
|
||||
k().setDirId(dirId);
|
||||
k().setTxnId(txnId);
|
||||
ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, k.toSlice(), ""));
|
||||
}
|
||||
{
|
||||
// sentinel, if necessary
|
||||
StaticValue<DirsToTxnsKey> k;
|
||||
k().setDirId(dirId);
|
||||
k().setSentinel();
|
||||
std::string v;
|
||||
auto status = dbTxn.Get({}, _dirsToTxnsCf, k.toSlice(), &v);
|
||||
if (status.IsNotFound()) { // we're the first ones here, add the sentinel
|
||||
auto v = CDCTxnIdValue::Static(txnId);
|
||||
ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, k.toSlice(), v.toSlice()));
|
||||
} else {
|
||||
ROCKS_DB_CHECKED(status);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Update metadata
|
||||
if (last == 0) {
|
||||
_setFirstTxnInQueue(dbTxn, txnId);
|
||||
}
|
||||
_setLastTxnInQueue(dbTxn, txnId);
|
||||
}
|
||||
|
||||
// Returns 0 if the queue was empty, otherwise what was popped
|
||||
uint64_t _reqQueuePeek(rocksdb::Transaction& dbTxn, CDCReqContainer& req) {
|
||||
uint64_t first = _firstTxnInQueue(dbTxn);
|
||||
if (first == 0) {
|
||||
return 0;
|
||||
// Returns the txn ids that might be free to work now. Note that we don't
|
||||
// know that for sure because they might not hold locks for all dirs. This
|
||||
// function does not check that.
|
||||
void _removeFromDirsToTxns(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req, std::vector<CDCTxnId>& mightBeReady) {
|
||||
for (const auto dirId: directoriesNeedingLock(req)) {
|
||||
LOG_DEBUG(_env, "removing dir %s for txn %s", dirId, txnId);
|
||||
StaticValue<DirsToTxnsKey> k;
|
||||
k().setDirId(dirId);
|
||||
k().setTxnId(txnId);
|
||||
// Check if there's a next key in line. We know that we won't
|
||||
// have to step over many deleted keys here because we go through the
|
||||
// list in order, and we seek from the list element we've just deleted.
|
||||
// It is however important to set the iteration upper bound as of to
|
||||
// not spill over and possibly trip over deleted keys.
|
||||
StaticValue<DirsToTxnsKey> upperBoundK;
|
||||
upperBoundK().setDirId(dirId);
|
||||
upperBoundK().setTxnId(CDCTxnId(~(uint64_t)0));
|
||||
rocksdb::Slice upperBoundSlice = upperBoundK.toSlice();
|
||||
rocksdb::ReadOptions itOptions;
|
||||
itOptions.iterate_upper_bound = &upperBoundSlice;
|
||||
std::unique_ptr<rocksdb::Iterator> it(dbTxn.GetIterator(itOptions, _dirsToTxnsCf));
|
||||
it->Seek(k.toSlice());
|
||||
// we must find the key here -- we're removing it.
|
||||
ALWAYS_ASSERT(it->Valid());
|
||||
{
|
||||
// Additional safety check: the key is what we expect.
|
||||
auto foundKey = ExternalValue<DirsToTxnsKey>::FromSlice(it->key());
|
||||
ALWAYS_ASSERT(!foundKey().isSentinel() && foundKey().txnId() == txnId);
|
||||
}
|
||||
// now that we've done our checks, we can remove the key
|
||||
ROCKS_DB_CHECKED(dbTxn.Delete(_dirsToTxnsCf, k.toSlice()));
|
||||
// then we look for the next one, if there's anything,
|
||||
// and overwrite/delete the sentinel
|
||||
StaticValue<DirsToTxnsKey> sentinelK;
|
||||
sentinelK().setDirId(dirId);
|
||||
sentinelK().setSentinel();
|
||||
it->Next();
|
||||
if (it->Valid()) { // there's something, set the sentinel
|
||||
auto nextK = ExternalValue<DirsToTxnsKey>::FromSlice(it->key());
|
||||
auto sentinelV = CDCTxnIdValue::Static(nextK().txnId());
|
||||
LOG_DEBUG(_env, "selected %s as next in line after finishing %s", nextK().txnId(), txnId);
|
||||
mightBeReady.emplace_back(nextK().txnId());
|
||||
ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, sentinelK.toSlice(), sentinelV.toSlice()));
|
||||
} else { // we were the last ones here, remove sentinel
|
||||
ROCKS_DB_CHECKED(it->status());
|
||||
ROCKS_DB_CHECKED(dbTxn.Delete(_dirsToTxnsCf, sentinelK.toSlice()));
|
||||
}
|
||||
}
|
||||
// Read
|
||||
{
|
||||
auto k = U64Key::Static(first);
|
||||
}
|
||||
|
||||
// Check if we have a lock on all the directories that matter to the txn id.
|
||||
// It is assumed that the txnId in question is already in _dirsToTxnsCf.
|
||||
bool _isReadyToGo(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) {
|
||||
for (const auto dirId: directoriesNeedingLock(req)) {
|
||||
// the sentinel _must_ be present -- at the very least us!
|
||||
StaticValue<DirsToTxnsKey> k;
|
||||
k().setDirId(dirId);
|
||||
k().setSentinel();
|
||||
std::string v;
|
||||
ROCKS_DB_CHECKED(dbTxn.Get({}, _reqQueueCf, k.toSlice(), &v));
|
||||
UnpackCDCReq ureq(req);
|
||||
bincodeFromRocksValue(v, ureq);
|
||||
ROCKS_DB_CHECKED(dbTxn.Get({}, _dirsToTxnsCf, k.toSlice(), &v));
|
||||
ExternalValue<CDCTxnIdValue> otherTxnId(v);
|
||||
if (otherTxnId().id() != txnId) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return first;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Returns 0 if the queue was empty, otherwise what was popped
|
||||
uint64_t _reqQueuePop(rocksdb::Transaction& dbTxn) {
|
||||
uint64_t first = _firstTxnInQueue(dbTxn);
|
||||
if (first == 0) {
|
||||
LOG_DEBUG(_env, "txn queue empty, returning 0");
|
||||
return 0;
|
||||
// Adds a request to the enqueued requests. Also adds it to dirsToTxns, which will implicitly
|
||||
// acquire locks.
|
||||
void _addToEnqueued(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) {
|
||||
{
|
||||
auto k = CDCTxnIdKey::Static(txnId);
|
||||
std::string v = bincodeToRocksValue(PackCDCReq(req));
|
||||
ROCKS_DB_CHECKED(dbTxn.Put(_enqueuedCf, k.toSlice(), v));
|
||||
}
|
||||
// Update metadata
|
||||
uint64_t last = _lastTxnInQueue(dbTxn);
|
||||
if (first == last) { // empty queue
|
||||
_setFirstTxnInQueue(dbTxn, 0);
|
||||
_setLastTxnInQueue(dbTxn, 0);
|
||||
} else {
|
||||
_setFirstTxnInQueue(dbTxn, first+1);
|
||||
}
|
||||
LOG_DEBUG(_env, "popped txn %s from queue", first);
|
||||
return first;
|
||||
_addToDirsToTxns(dbTxn, txnId, req);
|
||||
}
|
||||
|
||||
// Moves the state forward, filling in `step` appropriatedly, and writing
|
||||
// out the updated state.
|
||||
//
|
||||
// This will _not_ start a new transaction automatically if the old one
|
||||
// finishes. It does return whether the txn was finished though.
|
||||
template<template<typename> typename V>
|
||||
bool _advance(
|
||||
EggsTime time,
|
||||
void _advance(
|
||||
rocksdb::Transaction& dbTxn,
|
||||
uint64_t txnId,
|
||||
CDCTxnId txnId,
|
||||
const CDCReqContainer& req,
|
||||
// If `shardRespError` and `shardResp` are null, we're starting to execute.
|
||||
// Otherwise, (err == NO_ERROR) == (req != nullptr).
|
||||
EggsError shardRespError,
|
||||
const ShardRespContainer* shardResp,
|
||||
V<TxnState>& state,
|
||||
CDCStep& step
|
||||
CDCStep& step,
|
||||
// to collect things that might be able to start now because
|
||||
// we've finished doing other stuff
|
||||
std::vector<CDCTxnId>& txnIds
|
||||
) {
|
||||
LOG_DEBUG(_env, "advancing txn %s of kind %s", txnId, req.kind());
|
||||
StateMachineEnv sm(_env, _db, _defaultCf, _parentCf, dbTxn, time, txnId, state().step(), step);
|
||||
LOG_DEBUG(_env, "advancing txn %s of kind %s with state", txnId, req.kind());
|
||||
StateMachineEnv sm(_env, _defaultCf, _parentCf, dbTxn, txnId, state().step(), step);
|
||||
switch (req.kind()) {
|
||||
case CDCMessageKind::MAKE_DIRECTORY:
|
||||
MakeDirectoryStateMachine(sm, req.getMakeDirectory(), state().getMakeDirectory()).resume(shardRespError, shardResp);
|
||||
@@ -1489,155 +1712,162 @@ struct CDCDBImpl {
|
||||
}
|
||||
state().setStep(sm.txnStep);
|
||||
|
||||
ALWAYS_ASSERT(step.txnFinished == 0 || step.txnFinished == txnId);
|
||||
ALWAYS_ASSERT(step.txnNeedsShard == 0 || step.txnNeedsShard == txnId);
|
||||
ALWAYS_ASSERT(step.txnFinished == txnId || step.txnNeedsShard == txnId);
|
||||
if (step.txnFinished == txnId) {
|
||||
// we're done, clear the transaction from the system
|
||||
ALWAYS_ASSERT(_reqQueuePop(dbTxn) == txnId);
|
||||
_setExecutingTxn(dbTxn, 0);
|
||||
// not strictly necessary, it'll just be overwritten next time, but let's be tidy
|
||||
ROCKS_DB_CHECKED(dbTxn.Delete(_defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY)));
|
||||
// also, fill in whether we've got something next
|
||||
step.nextTxn = _firstTxnInQueue(dbTxn);
|
||||
return true;
|
||||
if (sm.finished) {
|
||||
// we finished immediately
|
||||
LOG_DEBUG(_env, "txn %s with req %s finished", txnId, req);
|
||||
_finishExecuting(dbTxn, txnId, req, txnIds);
|
||||
} else {
|
||||
// we're _not_ done, write out the state
|
||||
ROCKS_DB_CHECKED(dbTxn.Put(_defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), state.toSlice()));
|
||||
// we still have something to do, persist
|
||||
_setExecuting(dbTxn, txnId, state);
|
||||
}
|
||||
}
|
||||
|
||||
bool _isExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId) {
|
||||
auto k = CDCTxnIdKey::Static(txnId);
|
||||
std::string v;
|
||||
auto status = dbTxn.Get({}, _executingCf, k.toSlice(), &v);
|
||||
if (status.IsNotFound()) {
|
||||
return false;
|
||||
}
|
||||
ROCKS_DB_CHECKED(status);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Starts executing the next transaction in line, if possible. If it managed
|
||||
// to start something, it immediately advances it as well (no point delaying
|
||||
// that). Returns the txn id that was started, if any.
|
||||
uint64_t _startExecuting(EggsTime time, rocksdb::Transaction& dbTxn, CDCStep& step, CDCStatus& status) {
|
||||
uint64_t executingTxn = _executingTxn(dbTxn);
|
||||
if (executingTxn != 0) {
|
||||
LOG_DEBUG(_env, "another transaction %s is already executing, can't start", executingTxn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint64_t txnToExecute = _reqQueuePeek(dbTxn, _cdcReq);
|
||||
if (txnToExecute == 0) {
|
||||
LOG_DEBUG(_env, "no transactions in queue found, can't start");
|
||||
return 0;
|
||||
}
|
||||
|
||||
_setExecutingTxn(dbTxn, txnToExecute);
|
||||
LOG_DEBUG(_env, "starting to execute txn %s with req %s", txnToExecute, _cdcReq);
|
||||
StaticValue<TxnState> txnState;
|
||||
txnState().start(_cdcReq.kind());
|
||||
bool stillRunning = _advance(time, dbTxn, txnToExecute, _cdcReq, NO_ERROR, nullptr, txnState, step);
|
||||
|
||||
if (stillRunning) {
|
||||
status.runningTxn = txnToExecute;
|
||||
status.runningTxnKind = _cdcReq.kind();
|
||||
}
|
||||
|
||||
return txnToExecute;
|
||||
template<template<typename> typename V>
|
||||
void _setExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId, V<TxnState>& state) {
|
||||
auto k = CDCTxnIdKey::Static(txnId);
|
||||
ROCKS_DB_CHECKED(dbTxn.Put(_executingCf, k.toSlice(), state.toSlice()));
|
||||
}
|
||||
|
||||
uint64_t processCDCReq(
|
||||
bool sync,
|
||||
EggsTime time,
|
||||
uint64_t logIndex,
|
||||
const CDCReqContainer& req,
|
||||
CDCStep& step,
|
||||
CDCStatus& status
|
||||
) {
|
||||
status.reset();
|
||||
|
||||
auto locked = _processLock.lock();
|
||||
|
||||
rocksdb::WriteOptions options;
|
||||
options.sync = sync;
|
||||
std::unique_ptr<rocksdb::Transaction> dbTxn(_db->BeginTransaction(options));
|
||||
|
||||
step.clear();
|
||||
|
||||
_advanceLastAppliedLogEntry(*dbTxn, logIndex);
|
||||
|
||||
// Enqueue req
|
||||
uint64_t txnId;
|
||||
void _finishExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req, std::vector<CDCTxnId>& txnIds) {
|
||||
{
|
||||
// Generate new txn id
|
||||
txnId = _lastTxn(*dbTxn) + 1;
|
||||
_setLastTxn(*dbTxn, txnId);
|
||||
// Push to queue
|
||||
_reqQueuePush(*dbTxn, txnId, req);
|
||||
LOG_DEBUG(_env, "enqueued CDC req %s with txn id %s", req.kind(), txnId);
|
||||
// delete from _executingCf
|
||||
auto k = CDCTxnIdKey::Static(txnId);
|
||||
ROCKS_DB_CHECKED(dbTxn.Delete(_executingCf, k.toSlice()));
|
||||
}
|
||||
|
||||
// Start executing, if we can
|
||||
_startExecuting(time, *dbTxn, step, status);
|
||||
|
||||
LOG_DEBUG(_env, "committing transaction");
|
||||
commitTransaction(*dbTxn);
|
||||
|
||||
return txnId;
|
||||
// delete from dirsToTxnIds
|
||||
_removeFromDirsToTxns(dbTxn, txnId, req, txnIds);
|
||||
}
|
||||
|
||||
void processShardResp(
|
||||
bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already.
|
||||
EggsTime time,
|
||||
uint64_t logIndex,
|
||||
EggsError respError,
|
||||
// Starts executing the given transactions, if possible. If it managed
|
||||
// to start something, it immediately advances it as well (no point delaying
|
||||
// that).
|
||||
//
|
||||
// It modifies `txnIds` with new transactions we looked at if we immediately
|
||||
// finish executing txns that we start here.
|
||||
void _startExecuting(rocksdb::Transaction& dbTxn, std::vector<CDCTxnId>& txnIds, CDCStep& step) {
|
||||
CDCReqContainer req;
|
||||
for (int i = 0; i < txnIds.size(); i++) {
|
||||
CDCTxnId txnId = txnIds[i];
|
||||
auto reqK = CDCTxnIdKey::Static(txnId);
|
||||
std::string reqV;
|
||||
ROCKS_DB_CHECKED(dbTxn.Get({}, _enqueuedCf, reqK.toSlice(), &reqV));
|
||||
UnpackCDCReq ureq(req);
|
||||
bincodeFromRocksValue(reqV, ureq);
|
||||
if (!_isExecuting(dbTxn, txnId)) {
|
||||
if (_isReadyToGo(dbTxn, txnId, req)) {
|
||||
LOG_DEBUG(_env, "starting to execute txn %s with req %s, since it is ready to go and not executing already", txnId, req);
|
||||
StaticValue<TxnState> txnState;
|
||||
txnState().start(req.kind());
|
||||
_setExecuting(dbTxn, txnId, txnState);
|
||||
_advance(dbTxn, txnId, req, NO_ERROR, nullptr, txnState, step, txnIds);
|
||||
} else {
|
||||
LOG_DEBUG(_env, "waiting before executing txn %s with req %s, since it is not ready to go", txnId, req);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void _enqueueCDCReqs(
|
||||
rocksdb::Transaction& dbTxn,
|
||||
const std::vector<CDCReqContainer>& reqs,
|
||||
CDCStep& step,
|
||||
// we need two lists because one (`cdcReqsTxnIds`) is specifically
|
||||
// to return to the user, while the other is used for other purposes,
|
||||
// too.
|
||||
std::vector<CDCTxnId>& txnIdsToStart,
|
||||
std::vector<CDCTxnId>& cdcReqsTxnIds
|
||||
) {
|
||||
for (const auto& req: reqs) {
|
||||
uint64_t txnId;
|
||||
{
|
||||
// Generate new txn id
|
||||
txnId = _lastTxn(dbTxn) + 1;
|
||||
_setLastTxn(dbTxn, txnId);
|
||||
// Push to queue
|
||||
_addToEnqueued(dbTxn, txnId, req);
|
||||
LOG_DEBUG(_env, "enqueued CDC req %s with txn id %s", req.kind(), txnId);
|
||||
}
|
||||
|
||||
txnIdsToStart.emplace_back(txnId);
|
||||
cdcReqsTxnIds.emplace_back(txnId);
|
||||
}
|
||||
}
|
||||
|
||||
void _advanceWithResp(
|
||||
rocksdb::Transaction& dbTxn,
|
||||
CDCTxnId txnId,
|
||||
EggsError err,
|
||||
const ShardRespContainer* resp,
|
||||
CDCStep& step,
|
||||
CDCStatus& status
|
||||
std::vector<CDCTxnId>& txnIdsToStart
|
||||
) {
|
||||
status.reset();
|
||||
|
||||
auto locked = _processLock.lock();
|
||||
|
||||
rocksdb::WriteOptions options;
|
||||
options.sync = sync;
|
||||
std::unique_ptr<rocksdb::Transaction> dbTxn(_db->BeginTransaction(options));
|
||||
|
||||
step.clear();
|
||||
|
||||
_advanceLastAppliedLogEntry(*dbTxn, logIndex);
|
||||
|
||||
// Find the txn
|
||||
uint64_t txnId = _executingTxn(*dbTxn);
|
||||
ALWAYS_ASSERT(txnId != 0);
|
||||
auto txnIdK = CDCTxnIdKey::Static(txnId);
|
||||
|
||||
// Get the req
|
||||
CDCReqContainer cdcReq;
|
||||
{
|
||||
auto k = U64Key::Static(txnId);
|
||||
std::string reqV;
|
||||
ROCKS_DB_CHECKED(dbTxn->Get({}, _reqQueueCf, k.toSlice(), &reqV));
|
||||
UnpackCDCReq ureq(_cdcReq);
|
||||
ROCKS_DB_CHECKED(dbTxn.Get({}, _enqueuedCf, txnIdK.toSlice(), &reqV));
|
||||
UnpackCDCReq ureq(cdcReq);
|
||||
bincodeFromRocksValue(reqV, ureq);
|
||||
}
|
||||
|
||||
// Get the state
|
||||
std::string txnStateV;
|
||||
ROCKS_DB_CHECKED(dbTxn->Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV));
|
||||
ROCKS_DB_CHECKED(dbTxn.Get({}, _executingCf, txnIdK.toSlice(), &txnStateV));
|
||||
ExternalValue<TxnState> txnState(txnStateV);
|
||||
|
||||
// Advance
|
||||
bool stillRunning = _advance(time, *dbTxn, txnId, _cdcReq, respError, resp, txnState, step);
|
||||
// Advance with response
|
||||
_advance(dbTxn, txnId, cdcReq, err, resp, txnState, step, txnIdsToStart);
|
||||
}
|
||||
|
||||
if (stillRunning) {
|
||||
// Store status
|
||||
status.runningTxn = txnId;
|
||||
status.runningTxnKind = _cdcReq.kind();
|
||||
void update(
|
||||
bool sync,
|
||||
uint64_t logIndex,
|
||||
const std::vector<CDCReqContainer>& cdcReqs,
|
||||
const std::vector<CDCShardResp>& shardResps,
|
||||
CDCStep& step,
|
||||
std::vector<CDCTxnId>& cdcReqsTxnIds
|
||||
) {
|
||||
auto locked = _processLock.lock();
|
||||
|
||||
rocksdb::WriteOptions options;
|
||||
options.sync = sync;
|
||||
std::unique_ptr<rocksdb::Transaction> dbTxn(_db->BeginTransaction(options));
|
||||
|
||||
_advanceLastAppliedLogEntry(*dbTxn, logIndex);
|
||||
|
||||
step.clear();
|
||||
cdcReqsTxnIds.clear();
|
||||
|
||||
{
|
||||
std::vector<CDCTxnId> txnIdsToStart;
|
||||
_enqueueCDCReqs(*dbTxn, cdcReqs, step, txnIdsToStart, cdcReqsTxnIds);
|
||||
for (const auto& resp: shardResps) {
|
||||
_advanceWithResp(*dbTxn, resp.txnId, resp.err, &resp.resp, step, txnIdsToStart);
|
||||
}
|
||||
_startExecuting(*dbTxn, txnIdsToStart, step);
|
||||
}
|
||||
|
||||
commitTransaction(*dbTxn);
|
||||
}
|
||||
|
||||
void startNextTransaction(
|
||||
bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already.
|
||||
EggsTime time,
|
||||
void bootstrap(
|
||||
bool sync,
|
||||
uint64_t logIndex,
|
||||
CDCStep& step,
|
||||
CDCStatus& status
|
||||
CDCStep& step
|
||||
) {
|
||||
status.reset();
|
||||
|
||||
auto locked = _processLock.lock();
|
||||
|
||||
rocksdb::WriteOptions options;
|
||||
@@ -1647,31 +1877,17 @@ struct CDCDBImpl {
|
||||
step.clear();
|
||||
|
||||
_advanceLastAppliedLogEntry(*dbTxn, logIndex);
|
||||
uint64_t txnId = _startExecuting(time, *dbTxn, step, status);
|
||||
if (txnId == 0) {
|
||||
// no txn could be started, see if one is executing already to fill in the `step`
|
||||
txnId = _executingTxn(*dbTxn);
|
||||
if (txnId != 0) {
|
||||
LOG_DEBUG(_env, "transaction %s is already executing, will resume it", txnId);
|
||||
// Get the req
|
||||
{
|
||||
auto k = U64Key::Static(txnId);
|
||||
std::string reqV;
|
||||
ROCKS_DB_CHECKED(dbTxn->Get({}, _reqQueueCf, k.toSlice(), &reqV));
|
||||
UnpackCDCReq ureq(_cdcReq);
|
||||
bincodeFromRocksValue(reqV, ureq);
|
||||
}
|
||||
// Store status
|
||||
status.runningTxn = txnId;
|
||||
status.runningTxnKind = _cdcReq.kind();
|
||||
// Get the state
|
||||
std::string txnStateV;
|
||||
ROCKS_DB_CHECKED(dbTxn->Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV));
|
||||
ExternalValue<TxnState> txnState(txnStateV);
|
||||
// Advance
|
||||
_advance(time, *dbTxn, txnId, _cdcReq, NO_ERROR, nullptr, txnState, step);
|
||||
}
|
||||
|
||||
std::vector<CDCTxnId> txnIdsToStart;
|
||||
// Just collect all executing txns, and run them
|
||||
std::unique_ptr<rocksdb::Iterator> it(dbTxn->GetIterator({}, _executingCf));
|
||||
for (it->Seek(""); it->Valid(); it->Next()) {
|
||||
auto txnIdK = ExternalValue<CDCTxnIdKey>::FromSlice(it->key());
|
||||
_advanceWithResp(*dbTxn, txnIdK().id(), NO_ERROR, nullptr, step, txnIdsToStart);
|
||||
}
|
||||
ROCKS_DB_CHECKED(it->status());
|
||||
|
||||
_startExecuting(*dbTxn, txnIdsToStart, step);
|
||||
|
||||
commitTransaction(*dbTxn);
|
||||
}
|
||||
@@ -1699,20 +1915,16 @@ CDCDB::~CDCDB() {
|
||||
delete ((CDCDBImpl*)_impl);
|
||||
}
|
||||
|
||||
uint64_t CDCDB::processCDCReq(bool sync, EggsTime time, uint64_t logIndex, const CDCReqContainer& req, CDCStep& step, CDCStatus& status) {
|
||||
return ((CDCDBImpl*)_impl)->processCDCReq(sync, time, logIndex, req, step, status);
|
||||
void CDCDB::bootstrap(bool sync, uint64_t logIndex, CDCStep& step) {
|
||||
return ((CDCDBImpl*)_impl)->bootstrap(sync, logIndex, step);
|
||||
}
|
||||
|
||||
void CDCDB::processShardResp(bool sync, EggsTime time, uint64_t logIndex, EggsError respError, const ShardRespContainer* resp, CDCStep& step, CDCStatus& status) {
|
||||
return ((CDCDBImpl*)_impl)->processShardResp(sync, time, logIndex, respError, resp, step, status);
|
||||
}
|
||||
|
||||
void CDCDB::startNextTransaction(bool sync, EggsTime time, uint64_t logIndex, CDCStep& step, CDCStatus& status) {
|
||||
return ((CDCDBImpl*)_impl)->startNextTransaction(sync, time, logIndex, step, status);
|
||||
void CDCDB::update(bool sync, uint64_t logIndex, const std::vector<CDCReqContainer>& cdcReqs, const std::vector<CDCShardResp>& shardResps, CDCStep& step, std::vector<CDCTxnId>& cdcReqsTxnIds) {
|
||||
return ((CDCDBImpl*)_impl)->update(sync, logIndex, cdcReqs, shardResps, step, cdcReqsTxnIds);
|
||||
}
|
||||
|
||||
uint64_t CDCDB::lastAppliedLogEntry() {
|
||||
return ((CDCDBImpl*)_impl)->_lastAppliedLogEntry();
|
||||
return ((CDCDBImpl*)_impl)->_lastAppliedLogEntryDB();
|
||||
}
|
||||
|
||||
void CDCDB::rocksDBMetrics(std::unordered_map<std::string, uint64_t>& values) {
|
||||
|
||||
@@ -1,12 +1,38 @@
|
||||
#pragma once
|
||||
|
||||
#include "unordered_map"
|
||||
#include <unordered_map>
|
||||
#include <optional>
|
||||
|
||||
#include "Bincode.hpp"
|
||||
#include "Msgs.hpp"
|
||||
#include "Env.hpp"
|
||||
#include "Shuckle.hpp"
|
||||
|
||||
// This exists purely for type safety
|
||||
struct CDCTxnId {
|
||||
uint64_t x;
|
||||
|
||||
CDCTxnId() : x(0) {} // txn ids are never zeros
|
||||
CDCTxnId(uint64_t x_) : x(x_) {}
|
||||
|
||||
bool operator==(const CDCTxnId rhs) const {
|
||||
return x == rhs.x;
|
||||
}
|
||||
|
||||
bool operator!=(const CDCTxnId rhs) const {
|
||||
return x != rhs.x;
|
||||
}
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, CDCTxnId id);
|
||||
|
||||
template <>
|
||||
struct std::hash<CDCTxnId> {
|
||||
std::size_t operator()(const CDCTxnId key) const {
|
||||
return std::hash<uint64_t>{}(key.x);
|
||||
}
|
||||
};
|
||||
|
||||
struct CDCShardReq {
|
||||
ShardId shid;
|
||||
bool repeated; // This request is exactly the same as the previous one.
|
||||
@@ -21,49 +47,35 @@ struct CDCShardReq {
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const CDCShardReq& x);
|
||||
|
||||
struct CDCStep {
|
||||
// If non-zero, a transaction has just finished, and here we have
|
||||
// the response (whether an error or a response).
|
||||
uint64_t txnFinished;
|
||||
EggsError err; // if NO_ERROR, resp is contains the response.
|
||||
struct CDCFinished {
|
||||
// If err is not NO_ERROR, resp is not filled in
|
||||
EggsError err;
|
||||
CDCRespContainer resp;
|
||||
|
||||
// If non-zero, a transaction is running, but we need something
|
||||
// from a shard to have it proceed.
|
||||
//
|
||||
// We have !((finishedTxn != 0) && (txnNeedsShard != 0)) as an invariant
|
||||
// -- we can't have finished and be running a thing in the same step.
|
||||
uint64_t txnNeedsShard;
|
||||
CDCShardReq shardReq;
|
||||
|
||||
// If non-zero, there is a transaction after the current one waiting
|
||||
// to be executed. Only filled in if `txnNeedsShard == 0`.
|
||||
// Useful to decide when to call `startNextTransaction` (although
|
||||
// calling it is safe in any case).
|
||||
uint64_t nextTxn;
|
||||
|
||||
void clear() {
|
||||
txnFinished = 0;
|
||||
txnNeedsShard = 0;
|
||||
nextTxn = 0;
|
||||
}
|
||||
};
|
||||
|
||||
// Only used for timing purposes, there is some overlap with CDCStep,
|
||||
// but we separate it out for code robustness (we just get this info
|
||||
// every time we touch the db).
|
||||
struct CDCStatus {
|
||||
uint64_t runningTxn; // 0 if nothing
|
||||
CDCMessageKind runningTxnKind; // only relevant if it's running
|
||||
std::ostream& operator<<(std::ostream& out, const CDCFinished& x);
|
||||
|
||||
void reset() {
|
||||
runningTxn = 0;
|
||||
runningTxnKind = (CDCMessageKind)0;
|
||||
struct CDCStep {
|
||||
std::vector<std::pair<CDCTxnId, CDCFinished>> finishedTxns; // txns which have finished
|
||||
std::vector<std::pair<CDCTxnId, CDCShardReq>> runningTxns; // txns which need a new shard request
|
||||
|
||||
void clear() {
|
||||
finishedTxns.clear();
|
||||
runningTxns.clear();
|
||||
}
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const CDCStep& x);
|
||||
|
||||
struct CDCShardResp {
|
||||
CDCTxnId txnId; // the transaction id we're getting a response for
|
||||
// if err != NO_ERROR, resp is not filled in
|
||||
EggsError err;
|
||||
ShardRespContainer resp;
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const CDCShardResp& x);
|
||||
|
||||
struct CDCDB {
|
||||
private:
|
||||
void* _impl;
|
||||
@@ -81,52 +93,30 @@ public:
|
||||
// responses.
|
||||
//
|
||||
// The functions below cannot be called concurrently.
|
||||
//
|
||||
// TODO one thing that we'd like to do (outside the deterministic state
|
||||
// machine) is apply backpressure when too many txns are enqueued. It's
|
||||
// not good to do it inside the state machine, because then we'd be marrying
|
||||
// the deterministic state update function to such heuristics, which seems
|
||||
// imprudent. So we'd like some function returning the length of the queue.
|
||||
|
||||
// Enqueues a cdc request, and immediately starts it if the system is currently
|
||||
// idle. Returns the txn id that got assigned to the txn.
|
||||
uint64_t processCDCReq(
|
||||
bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already.
|
||||
EggsTime time,
|
||||
// Gives you what to do when the CDC is started back up. It'll basically just
|
||||
// tell you to send some requests to shards. You need to run this when starting.
|
||||
void bootstrap(
|
||||
bool sync,
|
||||
uint64_t logIndex,
|
||||
const CDCReqContainer& req,
|
||||
CDCStep& step,
|
||||
CDCStatus& status
|
||||
CDCStep& step
|
||||
);
|
||||
|
||||
// Advances the CDC state using the given shard response.
|
||||
// Enqueues some CDC requests, and immediately starts it if possible.
|
||||
// Returns the txn id that was assigned to each request.
|
||||
//
|
||||
// This function crashes hard if the caller passes it a response it's not expecting. So the caller
|
||||
// should track responses and make sure only the correct one is passed in.
|
||||
void processShardResp(
|
||||
bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already.
|
||||
EggsTime time,
|
||||
uint64_t logIndex,
|
||||
// (err == NO_ERROR) == (req != nullptr)
|
||||
EggsError err,
|
||||
const ShardRespContainer* req,
|
||||
CDCStep& step,
|
||||
CDCStatus& status
|
||||
);
|
||||
|
||||
// Does what it can to advance the state of the system, by starting the next
|
||||
// transaction in line (if any). In any case, it returns what there is to do next
|
||||
// in `step`.
|
||||
// Also, advances the CDC state using some shard responses.
|
||||
//
|
||||
// It's fine to call this function even if there's nothing to do -- and in fact
|
||||
// you should do that when starting up the CDC, to make sure to finish
|
||||
// in-flight CDC transactions.
|
||||
void startNextTransaction(
|
||||
bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already.
|
||||
EggsTime time,
|
||||
// This function crashes hard if the caller passes it a response it's not
|
||||
// expecting. So the caller should track responses and make sure only relevant
|
||||
// ones are passed in.
|
||||
void update(
|
||||
bool sync,
|
||||
uint64_t logIndex,
|
||||
const std::vector<CDCReqContainer>& cdcReqs,
|
||||
const std::vector<CDCShardResp>& shardResps,
|
||||
CDCStep& step,
|
||||
CDCStatus& status
|
||||
std::vector<CDCTxnId>& cdcReqsTxnIds // output txn ids for all the requests
|
||||
);
|
||||
|
||||
// The index of the last log entry persisted to the DB
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "RocksDBUtils.hpp"
|
||||
#include "Msgs.hpp"
|
||||
#include "Time.hpp"
|
||||
#include "CDCDB.hpp"
|
||||
|
||||
enum class CDCMetadataKey : uint8_t {
|
||||
LAST_APPLIED_LOG_ENTRY = 0,
|
||||
@@ -20,19 +21,92 @@ enum class CDCMetadataKey : uint8_t {
|
||||
EXECUTING_TXN = 4,
|
||||
EXECUTING_TXN_STATE = 6,
|
||||
LAST_DIRECTORY_ID = 7,
|
||||
VERSION = 8,
|
||||
};
|
||||
constexpr CDCMetadataKey LAST_APPLIED_LOG_ENTRY_KEY = CDCMetadataKey::LAST_APPLIED_LOG_ENTRY;
|
||||
constexpr CDCMetadataKey LAST_TXN_KEY = CDCMetadataKey::LAST_TXN;
|
||||
constexpr CDCMetadataKey FIRST_TXN_IN_QUEUE_KEY = CDCMetadataKey::FIRST_TXN_IN_QUEUE;
|
||||
constexpr CDCMetadataKey LAST_TXN_IN_QUEUE_KEY = CDCMetadataKey::LAST_TXN_IN_QUEUE;
|
||||
constexpr CDCMetadataKey EXECUTING_TXN_KEY = CDCMetadataKey::EXECUTING_TXN;
|
||||
constexpr CDCMetadataKey EXECUTING_TXN_STATE_KEY = CDCMetadataKey::EXECUTING_TXN_STATE;
|
||||
constexpr CDCMetadataKey NEXT_DIRECTORY_ID_KEY = CDCMetadataKey::LAST_DIRECTORY_ID;
|
||||
constexpr CDCMetadataKey LAST_APPLIED_LOG_ENTRY_KEY = CDCMetadataKey::LAST_APPLIED_LOG_ENTRY; // V0, V1
|
||||
constexpr CDCMetadataKey LAST_TXN_KEY = CDCMetadataKey::LAST_TXN; // V0, V1
|
||||
constexpr CDCMetadataKey FIRST_TXN_IN_QUEUE_KEY = CDCMetadataKey::FIRST_TXN_IN_QUEUE; // V0
|
||||
constexpr CDCMetadataKey LAST_TXN_IN_QUEUE_KEY = CDCMetadataKey::LAST_TXN_IN_QUEUE; // V0
|
||||
constexpr CDCMetadataKey EXECUTING_TXN_KEY = CDCMetadataKey::EXECUTING_TXN; // V0
|
||||
constexpr CDCMetadataKey EXECUTING_TXN_STATE_KEY = CDCMetadataKey::EXECUTING_TXN_STATE; // V0
|
||||
constexpr CDCMetadataKey NEXT_DIRECTORY_ID_KEY = CDCMetadataKey::LAST_DIRECTORY_ID; // V0, V1
|
||||
constexpr CDCMetadataKey VERSION_KEY = CDCMetadataKey::VERSION; // V1
|
||||
|
||||
inline rocksdb::Slice cdcMetadataKey(const CDCMetadataKey* k) {
|
||||
return rocksdb::Slice((const char*)k, sizeof(*k));
|
||||
}
|
||||
|
||||
struct CDCTxnIdKey {
|
||||
FIELDS(
|
||||
BE, CDCTxnId, id, setIdUnchecked,
|
||||
END_STATIC
|
||||
)
|
||||
|
||||
void setId(CDCTxnId i) {
|
||||
ALWAYS_ASSERT(i.x != 0);
|
||||
setIdUnchecked(i);
|
||||
}
|
||||
|
||||
static StaticValue<CDCTxnIdKey> Static(CDCTxnId id) {
|
||||
auto x = StaticValue<CDCTxnIdKey>();
|
||||
x().setId(id);
|
||||
return x;
|
||||
}
|
||||
};
|
||||
|
||||
struct CDCTxnIdValue {
|
||||
FIELDS(
|
||||
LE, CDCTxnId, id, setIdUnchecked,
|
||||
END_STATIC
|
||||
)
|
||||
|
||||
void setId(CDCTxnId i) {
|
||||
ALWAYS_ASSERT(i.x != 0);
|
||||
setIdUnchecked(i);
|
||||
}
|
||||
|
||||
static StaticValue<CDCTxnIdValue> Static(CDCTxnId id) {
|
||||
auto x = StaticValue<CDCTxnIdValue>();
|
||||
x().setId(id);
|
||||
return x;
|
||||
}
|
||||
};
|
||||
|
||||
// This data structure, conceptually, is std::unordered_map<InodeId, std::vector<CDCTxnId>>.
|
||||
//
|
||||
// To encode this in RocksDB, we store (InodeId, CDCTxnId) keys with no values. Txn ids are
|
||||
// increasing so the order will be naturally what we want.
|
||||
//
|
||||
// We also store a sentinel with the head of the list. This is to avoid having to step on many
|
||||
// deleted keys when checking if a dir is already locked.
|
||||
//
|
||||
// The functions adding/removing elements to the list are tasked with bookeeping the sentinel.
|
||||
struct DirsToTxnsKey {
|
||||
FIELDS(
|
||||
BE, InodeId, dirId, setDirId,
|
||||
BE, uint64_t, maybeTxnId, setMaybeTxnId, // if 0, it's a sentinel
|
||||
END_STATIC
|
||||
)
|
||||
|
||||
bool isSentinel() const {
|
||||
return maybeTxnId() == 0;
|
||||
}
|
||||
|
||||
CDCTxnId txnId() const {
|
||||
ALWAYS_ASSERT(maybeTxnId() != 0);
|
||||
return maybeTxnId();
|
||||
}
|
||||
|
||||
void setTxnId(CDCTxnId txnId) {
|
||||
ALWAYS_ASSERT(txnId.x != 0);
|
||||
setMaybeTxnId(txnId.x);
|
||||
}
|
||||
|
||||
void setSentinel() {
|
||||
setMaybeTxnId(0);
|
||||
}
|
||||
};
|
||||
|
||||
struct MakeDirectoryState {
|
||||
FIELDS(
|
||||
LE, InodeId, dirId, setDirId,
|
||||
|
||||
@@ -7,6 +7,11 @@
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stddef.h>
|
||||
#include <stdio.h>
|
||||
#include <fcntl.h>
|
||||
#include <errno.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
@@ -47,6 +52,31 @@ static void wyhash64_bytes(uint64_t* state, uint8_t* bytes, size_t len) {
|
||||
}
|
||||
}
|
||||
|
||||
// Not specific to wyhash, just handy
|
||||
__attribute__((unused))
|
||||
static uint64_t wyhash64_rand() {
|
||||
int fd = open("/dev/urandom", O_RDONLY);
|
||||
if (fd < 0) {
|
||||
fprintf(stderr, "could not open /dev/urandom: %d", errno);
|
||||
exit(1);
|
||||
}
|
||||
uint64_t x;
|
||||
ssize_t r = read(fd, &x, sizeof(x));
|
||||
if (r < 0) {
|
||||
fprintf(stderr, "could read /dev/urandom: %d", errno);
|
||||
exit(1);
|
||||
}
|
||||
if (r != sizeof(x)) {
|
||||
fprintf(stderr, "expected %ld bytes from /dev/urandom, got %ld", sizeof(x), r);
|
||||
exit(1);
|
||||
}
|
||||
if (close(fd) < 0) {
|
||||
fprintf(stderr, "could not close /dev/urandom: %d", errno);
|
||||
exit(1);
|
||||
}
|
||||
return x;
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -302,6 +302,15 @@ func (r *RunTests) run(
|
||||
},
|
||||
)
|
||||
|
||||
r.test(
|
||||
log,
|
||||
"parallel dirs",
|
||||
"",
|
||||
func(counters *lib.ClientCounters) {
|
||||
parallelDirsTest(log, r.shuckleAddress(), counters)
|
||||
},
|
||||
)
|
||||
|
||||
largeFileOpts := largeFileTestOpts{
|
||||
fileSize: 1 << 30, // 1GiB
|
||||
}
|
||||
@@ -314,15 +323,6 @@ func (r *RunTests) run(
|
||||
},
|
||||
)
|
||||
|
||||
r.test(
|
||||
log,
|
||||
"parallel dirs",
|
||||
"",
|
||||
func(counters *lib.ClientCounters) {
|
||||
parallelDirsTest(log, r.shuckleAddress(), counters)
|
||||
},
|
||||
)
|
||||
|
||||
rsyncOpts := rsyncTestOpts{
|
||||
maxFileSize: 200 << 20, // 200MiB
|
||||
numFiles: 100, // 20GiB
|
||||
|
||||
@@ -936,6 +936,7 @@ func NewClientDirectNoAddrs(
|
||||
log *Logger,
|
||||
) (c *Client, err error) {
|
||||
c = &Client{
|
||||
// do not catch requests from previous executions
|
||||
requestIdCounter: rand.Uint64(),
|
||||
fetchBlockBufs: sync.Pool{
|
||||
New: func() any {
|
||||
|
||||
Reference in New Issue
Block a user