diff --git a/cpp/cdc/CDC.cpp b/cpp/cdc/CDC.cpp index 97011e25..aac1873f 100644 --- a/cpp/cdc/CDC.cpp +++ b/cpp/cdc/CDC.cpp @@ -6,13 +6,14 @@ #include #include #include -#include #include #include #include #include #include #include +#include +#include #include "Bincode.hpp" #include "CDC.hpp" @@ -41,9 +42,6 @@ struct CDCShared { std::array shards; // How long it took us to process the entire request, from parse to response. std::array timingsTotal; - // How long it took to process the request, from when it exited the queue to - // when it finished executing. - std::array timingsProcess; std::array errors; // right now we have a max of 200req/s and we send the metrics every minute or so, so // this should cover us for at least 1.5mins. Power of two is good for mod. @@ -57,7 +55,6 @@ struct CDCShared { ownPorts[1].store(0); for (CDCMessageKind kind : allCDCMessageKind) { timingsTotal[(int)kind] = Timings::Standard(); - timingsProcess[(int)kind] = Timings::Standard(); } for (int i = 0; i < inFlightTxnsWindow.size(); i++) { inFlightTxnsWindow[i] = 0; @@ -66,13 +63,15 @@ struct CDCShared { }; struct InFlightShardRequest { - uint64_t txnId; // the txn id that requested this shard request + CDCTxnId txnId; // the txn id that requested this shard request EggsTime sentAt; - uint64_t shardRequestId; ShardId shid; }; struct InFlightCDCRequest { + bool hasClient; + uint64_t lastSentRequestId; + // if hasClient=false, the following is all garbage. uint64_t cdcRequestId; EggsTime receivedAt; struct sockaddr_in clientAddr; @@ -114,10 +113,70 @@ struct InFlightCDCRequestKey { template <> struct std::hash { std::size_t operator()(const InFlightCDCRequestKey& key) const { - return key.requestId ^ (((uint64_t)key.port << 32) | ((uint64_t)key.ip)); + return std::hash{}(key.requestId ^ (((uint64_t)key.port << 32) | ((uint64_t)key.ip))); } }; +struct InFlightShardRequests { +private: + using RequestsMap = std::unordered_map; + RequestsMap _reqs; + + std::map _pq; + +public: + size_t size() const { + return _reqs.size(); + } + + RequestsMap::const_iterator oldest() const { + ALWAYS_ASSERT(size() > 0); + auto reqByTime = _pq.begin(); + return _reqs.find(reqByTime->second); + } + + RequestsMap::const_iterator find(uint64_t reqId) const { + return _reqs.find(reqId); + } + + RequestsMap::const_iterator end() { + return _reqs.end(); + } + + void erase(RequestsMap::const_iterator iterator) { + _pq.erase(iterator->second.sentAt); + _reqs.erase(iterator); + } + + void insert(uint64_t reqId, const InFlightShardRequest& req) { + auto [reqIt, inserted] = _reqs.insert({reqId, req}); + + // TODO i think we can just assert inserted, we never need this + // functionality + + if (inserted) { + // we have never seen this shard request. + // technically we could get the same time twice, but in practice + // we won't, so just assert it. + ALWAYS_ASSERT(_pq.insert({req.sentAt, reqId}).second); + } else { + // we had already seen this. make sure it's for the same stuff, and update pq. + ALWAYS_ASSERT(reqIt->second.txnId == req.txnId); + ALWAYS_ASSERT(reqIt->second.shid == req.shid); + ALWAYS_ASSERT(_pq.erase(reqIt->second.sentAt) == 1); // must be already present + ALWAYS_ASSERT(_pq.insert({req.sentAt, reqId}).second); // insert with new time + reqIt->second.sentAt = req.sentAt; // update time in existing entry + } + } +}; + +struct CDCReqInfo { + uint64_t reqId; + struct sockaddr_in clientAddr; + EggsTime receivedAt; + int sock; +}; + struct CDCServer : Loop { private: CDCShared& _shared; @@ -126,19 +185,21 @@ private: uint64_t _currentLogIndex; std::vector _recvBuf; std::vector _sendBuf; - CDCReqContainer _cdcReqContainer; - ShardRespContainer _shardRespContainer; + std::vector _cdcReqs; + std::vector _cdcReqsInfo; + std::vector _cdcReqsTxnIds; + std::vector _shardResps; CDCStep _step; uint64_t _shardRequestIdCounter; - int _epoll; - std::array _socks; - struct epoll_event _events[4]; + // order: CDC, shard, CDC, shard + // length will be 2 or 4 depending on whether we have a second ip + std::vector _socks; AES128Key _expandedCDCKey; Duration _shardTimeout; uint64_t _maximumEnqueuedRequests; // The requests we've enqueued, but haven't completed yet, with // where to send the response. Indexed by txn id. - std::unordered_map _inFlightTxns; + std::unordered_map _inFlightTxns; uint64_t _inFlightTxnsWindowCursor; // The enqueued requests, but indexed by req id + ip + port. We // store this so that we can drop repeated requests which are @@ -148,27 +209,7 @@ private: // a useful optimization for now that we live with it. std::unordered_set _inFlightCDCReqs; // The _shard_ request we're currently waiting for, if any. - std::optional _inFlightShardReq; - // This is just used to calculate the timings - uint64_t _runningTxn; - CDCMessageKind _runningTxnKind; - EggsTime _runningTxnStartedAt; - CDCStatus _status; - - void _updateProcessTimings() { - if (_status.runningTxn != _runningTxn) { // we've got something new - EggsTime now = eggsNow(); - if (_runningTxn != 0) { // something has finished running - _shared.timingsProcess[(int)_runningTxnKind].add(now - _runningTxnStartedAt); - _runningTxn = 0; - } - if (_status.runningTxn != 0) { // something has started running - _runningTxn = _status.runningTxn; - _runningTxnKind = _status.runningTxnKind; - _runningTxnStartedAt = now; - } - } - } + InFlightShardRequests _inFlightShardReqs; public: CDCServer(Logger& logger, std::shared_ptr& xmon, const CDCOptions& options, CDCShared& shared) : @@ -178,14 +219,13 @@ public: _ipPorts(options.ipPorts), _recvBuf(DEFAULT_UDP_MTU), _sendBuf(DEFAULT_UDP_MTU), - _shardRequestIdCounter(0), + // important to not catch stray requests from previous executions + _shardRequestIdCounter(wyhash64_rand()), _shardTimeout(options.shardTimeout), _maximumEnqueuedRequests(options.maximumEnqueuedRequests), - _inFlightTxnsWindowCursor(0), - _runningTxn(0) + _inFlightTxnsWindowCursor(0) { _currentLogIndex = _shared.db.lastAppliedLogEntry(); - memset(&_socks[0], 0, sizeof(_socks)); expandKey(CDCKey, _expandedCDCKey); } @@ -201,32 +241,70 @@ public: _seenShards = true; _initAfterShardsSeen(); } + + // clear internal buffers + _cdcReqs.clear(); + _cdcReqsInfo.clear(); + _cdcReqsTxnIds.clear(); + _shardResps.clear(); // Process CDC requests and shard responses { auto now = eggsNow(); - if (_inFlightShardReq && (now - _inFlightShardReq->sentAt) > _shardTimeout) { - LOG_DEBUG(_env, "in-flight shard request %s was sent at %s, it's now %s, timing out (%s > %s)", _inFlightShardReq->shardRequestId, _inFlightShardReq->sentAt, now, (now - _inFlightShardReq->sentAt), _shardTimeout); - auto shid = _inFlightShardReq->shid; - _inFlightShardReq.reset(); - _handleShardError(shid, EggsError::TIMEOUT); + for (;;) { + if (_inFlightShardReqs.size() == 0) { break; } + auto oldest = _inFlightShardReqs.oldest(); + if ((now - oldest->second.sentAt) < _shardTimeout) { break; } + + LOG_DEBUG(_env, "in-flight shard request %s was sent at %s, it's now %s, will time out (%s > %s)", oldest->first, oldest->second.sentAt, now, (now - oldest->second.sentAt), _shardTimeout); + auto resp = _prepareCDCShardResp(oldest->first); // erases `oldest` + ALWAYS_ASSERT(resp != nullptr); // must be there, we've just timed it out + resp->err = EggsError::TIMEOUT; } } // 10ms timeout for prompt termination and for shard resps timeouts - int nfds = epoll_wait(_epoll, _events, _socks.size(), 10 /*milliseconds*/); - if (nfds < 0) { - throw SYSCALL_EXCEPTION("epoll_wait"); + int ret = poll(_socks.data(), _socks.size(), 10); + if (ret < 0) { + throw SYSCALL_EXCEPTION("poll"); } - for (int i = 0; i < nfds; i++) { - const auto& event = _events[i]; - if (event.data.u64%2 == 0) { - _drainCDCSock(_socks[event.data.u64]); - } else { - _drainShardSock(_socks[event.data.u64]); + // drain sockets + // TODO drain shard sockets first, put an upper bound on the number + // of accepted packets, so that we prioritize completing outstanding + // txns. + for (int i = 0; i < _socks.size(); i++) { + const auto& pollFd = _socks[i]; + if (pollFd.events & POLLIN) { + if (i%2 == 0) { + _drainCDCSock(pollFd.fd); + } else { + _drainShardSock(pollFd.fd); + } } } + + if (_cdcReqs.size() > 0 || _shardResps.size() > 0) { + // process everything in a single batch + _shared.db.update(true, _advanceLogIndex(), _cdcReqs, _shardResps, _step, _cdcReqsTxnIds); + // record txn ids etc. for newly received requests + for (int i = 0; i < _cdcReqs.size(); i++) { + const auto& req = _cdcReqs[i]; + const auto& reqInfo = _cdcReqsInfo[i]; + CDCTxnId txnId = _cdcReqsTxnIds[i]; + ALWAYS_ASSERT(_inFlightTxns.find(txnId) == _inFlightTxns.end()); + auto& inFlight = _inFlightTxns[txnId]; + inFlight.hasClient = true; + inFlight.cdcRequestId = reqInfo.reqId; + inFlight.clientAddr = reqInfo.clientAddr; + inFlight.kind = req.kind(); + inFlight.receivedAt = reqInfo.receivedAt; + inFlight.sock = reqInfo.sock; + _updateInFlightTxns(); + _inFlightCDCReqs.insert(InFlightCDCRequestKey(reqInfo.reqId, reqInfo.clientAddr)); + } + _processStep(); + } } virtual void finish() override { @@ -261,17 +339,34 @@ private: return true; } + // To be called when we have a shard response with given `reqId`. + // Searches it in the in flight map, removes it from it, and + // adds a CDCShardResp to `_shardResps`. + // nullptr if we couldn't find the in flight response. Fills in txnId, + // and nothing else. + CDCShardResp* _prepareCDCShardResp(uint64_t reqId) { + // If it's not the request we wanted, skip + auto reqIt = _inFlightShardReqs.find(reqId); + if (reqIt == _inFlightShardReqs.end()) { + LOG_INFO(_env, "got unexpected shard request id %s, dropping", reqId); + return nullptr; + } + CDCTxnId txnId = reqIt->second.txnId; + auto& resp = _shardResps.emplace_back(); + resp.txnId = reqIt->second.txnId; + _inFlightShardReqs.erase(reqIt); // not in flight anymore + return &resp; + } + void _initAfterShardsSeen() { // initialize everything after having seen the shards // Create sockets. We create one socket for listening to client requests and one for listening // the the shard's responses. If we have two IPs we do this twice. + _socks.resize((_ipPorts[1].ip == 0) ? 2 : 4); for (int i = 0; i < _socks.size(); i++) { - if (i > 1 && _ipPorts[1].ip == 0) { // we don't have a second IP - _socks[i] = -1; - continue; - } int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - _socks[i] = sock; + _socks[i].fd = sock; + _socks[i].events = POLLIN; if (sock < 0) { throw SYSCALL_EXCEPTION("cannot create socket"); } @@ -305,32 +400,12 @@ private: LOG_DEBUG(_env, "bound shard %s sock to port %s", i/2, ntohs(addr.sin_port)); } } - - // create epoll structure - // TODO I did this when I had more sockets, we could just use select now that it's 4 - // of them... - _epoll = epoll_create1(0); - if (_epoll < 0) { - throw SYSCALL_EXCEPTION("epoll"); - } - for (int i = 0; i < _socks.size(); i++) { - if (i > 1 && _ipPorts[1].ip == 0) { // we don't have a second IP - break; - } - auto& event = _events[i]; - event.data.u64 = i; - event.events = EPOLLIN | EPOLLET; - if (epoll_ctl(_epoll, EPOLL_CTL_ADD, _socks[i], &event) == -1) { - throw SYSCALL_EXCEPTION("epoll_ctl"); - } - } - + LOG_INFO(_env, "running on ports %s and %s", _shared.ownPorts[0].load(), _shared.ownPorts[1].load()); - // If we've got a dangling transaction, immediately start processing it - _shared.db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step, _status); - _updateProcessTimings(); - _processStep(_step); + // If we've got dangling transactions, immediately start processing it + _shared.db.bootstrap(true, _advanceLogIndex(), _step); + _processStep(); } void _drainCDCSock(int sock) { @@ -381,9 +456,10 @@ private: EggsError err = NO_ERROR; // Now, try to parse the body + auto& cdcReq = _cdcReqs.emplace_back(); try { - _cdcReqContainer.unpack(reqBbuf, reqHeader.kind); - LOG_DEBUG(_env, "parsed request: %s", _cdcReqContainer); + cdcReq.unpack(reqBbuf, reqHeader.kind); + LOG_DEBUG(_env, "parsed request: %s", cdcReq); } catch (const BincodeException& exc) { LOG_ERROR(_env, "could not parse: %s", exc.what()); RAISE_ALERT(_env, "could not parse CDC request of kind %s from %s, will reply with error.", reqHeader.kind, clientAddr); @@ -397,24 +473,18 @@ private: } if (err == NO_ERROR) { - // If things went well, process the request - LOG_DEBUG(_env, "CDC request %s successfully parsed, will now process", _cdcReqContainer.kind()); - uint64_t txnId = _shared.db.processCDCReq(true, eggsNow(), _advanceLogIndex(), _cdcReqContainer, _step, _status); - _updateProcessTimings(); - auto& inFlight = _inFlightTxns[txnId]; - inFlight.cdcRequestId = reqHeader.requestId; - inFlight.clientAddr = clientAddr; - inFlight.kind = reqHeader.kind; - inFlight.sock = sock; - inFlight.receivedAt = receivedAt; - _updateInFlightTxns(); - _inFlightCDCReqs.insert(InFlightCDCRequestKey(reqHeader.requestId, clientAddr)); - // Go forward - _processStep(_step); + LOG_DEBUG(_env, "CDC request %s successfully parsed, will process soon", cdcReq.kind()); + _cdcReqsInfo.emplace_back(CDCReqInfo{ + .reqId = reqHeader.requestId, + .clientAddr = clientAddr, + .receivedAt = receivedAt, + .sock = sock, + }); } else { - // Otherwise we can immediately reply with an error - RAISE_ALERT(_env, "request %s failed before enqueue with error %s", _cdcReqContainer.kind(), err); + // We couldn't parse, reply immediately with an error + RAISE_ALERT(_env, "request %s failed before enqueue with error %s", cdcReq.kind(), err); _sendError(sock, reqHeader.requestId, err, clientAddr); + _cdcReqs.pop_back(); // let's just forget all about this } } } @@ -448,100 +518,90 @@ private: // control all the code in this codebase, and the header is good, and we're a // bit lazy. - // If it's not the request we wanted, skip - if (!_inFlightShardReq) { - LOG_INFO(_env, "got unexpected shard request id %s, kind %s, from shard, dropping", respHeader.requestId, respHeader.kind); + auto shardResp = _prepareCDCShardResp(respHeader.requestId); + if (shardResp == nullptr) { + // we couldn't find it continue; } - if (_inFlightShardReq->shardRequestId != respHeader.requestId) { - LOG_INFO(_env, "got unexpected shard request id %s (expected %s), kind %s, from shard %s, dropping", respHeader.requestId, _inFlightShardReq->shardRequestId, respHeader.kind, _inFlightShardReq->shid); - continue; - } - uint64_t txnId = _inFlightShardReq->txnId; - - // We can forget about this, we're going to process it right now - _inFlightShardReq.reset(); // We got an error if (respHeader.kind == (ShardMessageKind)0) { - EggsError err = reqBbuf.unpackScalar(); - _handleShardError(_inFlightShardReq->shid, err); + shardResp->err = reqBbuf.unpackScalar(); + LOG_DEBUG(_env, "got error %s for response id %s", shardResp->err, respHeader.requestId); continue; } // Otherwise, parse the body - _shardRespContainer.unpack(reqBbuf, respHeader.kind); - LOG_DEBUG(_env, "parsed shard response: %s", _shardRespContainer); + shardResp->resp.unpack(reqBbuf, respHeader.kind); + LOG_DEBUG(_env, "parsed shard response: %s", shardResp->resp); ALWAYS_ASSERT(reqBbuf.remaining() == 0); // If all went well, advance with the newly received request - LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, will now process", respHeader.requestId, respHeader.kind); - _shared.db.processShardResp(true, eggsNow(), _advanceLogIndex(), NO_ERROR, &_shardRespContainer, _step, _status); - _updateProcessTimings(); - _processStep(_step); + LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, process soon", respHeader.requestId, respHeader.kind); } } - void _handleShardError(ShardId shid, EggsError err) { - if (innocuousShardError(err)) { - LOG_DEBUG(_env, "got innocuous shard error %s from shard %s", err, shid); - } else if (rareInnocuousShardError(err)) { - LOG_INFO(_env, "got rare innocuous shard error %s from shard %s", err, shid); - } else { - RAISE_ALERT(_env, "got shard error %s from shard %s", err, shid); - } - _shared.db.processShardResp(true, eggsNow(), _advanceLogIndex(), err, nullptr, _step, _status); - _updateProcessTimings(); - _processStep(_step); - } - - void _processStep(const CDCStep& step) { - LOG_DEBUG(_env, "processing step %s", step); - if (step.txnFinished != 0) { - LOG_DEBUG(_env, "txn %s finished", step.txnFinished); + void _processStep() { + LOG_DEBUG(_env, "processing step %s", _step); + // finished txns + for (const auto& [txnId, resp]: _step.finishedTxns) { + LOG_DEBUG(_env, "txn %s finished", txnId); // we need to send the response back to the client - auto inFlight = _inFlightTxns.find(step.txnFinished); - if (inFlight == _inFlightTxns.end()) { - LOG_INFO(_env, "Could not find in-flight request %s, this might be because the CDC was restarted in the middle of a transaction.", step.txnFinished); - } else { + auto inFlight = _inFlightTxns.find(txnId); + if (inFlight->second.hasClient) { _shared.timingsTotal[(int)inFlight->second.kind].add(eggsNow() - inFlight->second.receivedAt); - _shared.errors[(int)inFlight->second.kind].add(_step.err); - if (step.err != NO_ERROR) { - if (rareInnocuousShardError(step.err)) { - LOG_INFO(_env, "txn %s, req id %s, finished with rare innocuous error %s", step.txnFinished, inFlight->second.cdcRequestId, step.err); - } else if (!innocuousShardError(step.err)) { - RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", step.txnFinished, inFlight->second.cdcRequestId, step.err); + _shared.errors[(int)inFlight->second.kind].add(resp.err); + if (resp.err != NO_ERROR) { + if (innocuousShardError(resp.err)) { + LOG_INFO(_env, "txn %s, req id %s, finished with innocuous error %s", txnId, inFlight->second.cdcRequestId, resp.err); + } else if (rareInnocuousShardError(resp.err)) { + LOG_INFO(_env, "txn %s, req id %s, finished with rare innocuous error %s", txnId, inFlight->second.cdcRequestId, resp.err); + } else { + RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", txnId, inFlight->second.cdcRequestId, resp.err); } - _sendError(inFlight->second.sock, inFlight->second.cdcRequestId, step.err, inFlight->second.clientAddr); + _sendError(inFlight->second.sock, inFlight->second.cdcRequestId, resp.err, inFlight->second.clientAddr); } else { LOG_DEBUG(_env, "sending response with req id %s, kind %s, back to %s", inFlight->second.cdcRequestId, inFlight->second.kind, inFlight->second.clientAddr); BincodeBuf bbuf(&_sendBuf[0], _sendBuf.size()); CDCResponseHeader respHeader(inFlight->second.cdcRequestId, inFlight->second.kind); respHeader.pack(bbuf); - step.resp.pack(bbuf); + resp.resp.pack(bbuf); _send(inFlight->second.sock, inFlight->second.clientAddr, (const char*)bbuf.data, bbuf.len()); } _inFlightCDCReqs.erase(InFlightCDCRequestKey(inFlight->second.cdcRequestId, inFlight->second.clientAddr)); - _inFlightTxns.erase(inFlight); - _updateInFlightTxns(); } + _inFlightTxns.erase(inFlight); + _updateInFlightTxns(); } - if (step.txnNeedsShard != 0) { + // in flight txns + for (const auto& [txnId, shardReq]: _step.runningTxns) { CDCShardReq prevReq; - LOG_TRACE(_env, "txn %s needs shard %s, req %s", step.txnNeedsShard, step.shardReq.shid, step.shardReq.req); + LOG_TRACE(_env, "txn %s needs shard %s, req %s", txnId, shardReq.shid, shardReq.req); BincodeBuf bbuf(&_sendBuf[0], _sendBuf.size()); // Header ShardRequestHeader shardReqHeader; // Do not allocate new req id for repeated requests, so that we'll just accept - // the first one that comes back. - if (!step.shardReq.repeated) { + // the first one that comes back. There's a chance for the txnId to not be here + // yet: if we have just restarted the CDC. In this case we fill it in here, but + // obviously without client addr. + auto inFlightTxn = _inFlightTxns.find(txnId); + if (inFlightTxn == _inFlightTxns.end()) { + LOG_INFO(_env, "Could not find in-flight transaction %s, this might be because the CDC was restarted in the middle of a transaction.", txnId); + InFlightCDCRequest req; + req.hasClient = false; + req.lastSentRequestId = _freshShardReqId(); + inFlightTxn = _inFlightTxns.emplace(txnId, req).first; + _updateInFlightTxns(); + } else if (shardReq.repeated) { + shardReqHeader.requestId = inFlightTxn->second.lastSentRequestId; + } else { _shardRequestIdCounter++; + shardReqHeader.requestId = _shardRequestIdCounter; } - shardReqHeader.requestId = _shardRequestIdCounter; - shardReqHeader.kind = step.shardReq.req.kind(); + shardReqHeader.kind = shardReq.req.kind(); shardReqHeader.pack(bbuf); // Body - step.shardReq.req.pack(bbuf); + shardReq.req.pack(bbuf); // MAC, if necessary if (isPrivilegedRequestKind(shardReqHeader.kind)) { bbuf.packFixedBytes<8>({cbcmac(_expandedCDCKey, bbuf.data, bbuf.len())}); @@ -550,7 +610,7 @@ private: struct sockaddr_in shardAddr; memset(&shardAddr, 0, sizeof(shardAddr)); _shared.shardsMutex.lock(); - ShardInfo shardInfo = _shared.shards[step.shardReq.shid.u8]; + ShardInfo shardInfo = _shared.shards[shardReq.shid.u8]; _shared.shardsMutex.unlock(); auto now = eggsNow(); // randomly pick one of the shard addrs and one of our sockets int whichShardAddr = now.ns & !!shardInfo.port2; @@ -559,21 +619,15 @@ private: shardAddr.sin_port = htons(whichShardAddr ? shardInfo.port2 : shardInfo.port1); static_assert(sizeof(shardAddr.sin_addr) == sizeof(shardInfo.ip1)); memcpy(&shardAddr.sin_addr, (whichShardAddr ? shardInfo.ip2 : shardInfo.ip1).data.data(), sizeof(shardAddr.sin_addr)); - LOG_DEBUG(_env, "sending request with req id %s to shard %s (%s)", shardReqHeader.requestId, step.shardReq.shid, shardAddr); - _send(_socks[whichSock*2 + 1], shardAddr, (const char*)bbuf.data, bbuf.len()); + LOG_DEBUG(_env, "sending request for txn %s with req id %s to shard %s (%s)", txnId, shardReqHeader.requestId, shardReq.shid, shardAddr); + _send(_socks[whichSock*2 + 1].fd, shardAddr, (const char*)bbuf.data, bbuf.len()); // Record the in-flight req - ALWAYS_ASSERT(!_inFlightShardReq); - auto& inFlight = _inFlightShardReq.emplace(); - inFlight.shardRequestId = shardReqHeader.requestId; - inFlight.sentAt = now; - inFlight.txnId = step.txnNeedsShard; - inFlight.shid = step.shardReq.shid; - } - if (step.nextTxn != 0) { - LOG_DEBUG(_env, "we have txn %s lined up, starting it", step.nextTxn); - _shared.db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step, _status); - _updateProcessTimings(); - _processStep(_step); + _inFlightShardReqs.insert(shardReqHeader.requestId, InFlightShardRequest{ + .txnId = txnId, + .sentAt = now, + .shid = shardReq.shid, + }); + inFlightTxn->second.lastSentRequestId = shardReqHeader.requestId; } } @@ -730,11 +784,6 @@ public: _shared.timingsTotal[(int)kind].toStats(prefix.str(), _stats); _shared.errors[(int)kind].toStats(prefix.str(), _stats); } - { - std::ostringstream prefix; - prefix << "cdc." << kind << ".process"; - _shared.timingsProcess[(int)kind].toStats(prefix.str(), _stats); - } } err = insertStats(_shuckleHost, _shucklePort, 10_sec, _stats); _stats.clear(); @@ -743,7 +792,6 @@ public: for (CDCMessageKind kind : allCDCMessageKind) { _shared.timingsTotal[(int)kind].reset(); _shared.errors[(int)kind].reset(); - _shared.timingsProcess[(int)kind].reset(); } return true; } else { diff --git a/cpp/cdc/CDCDB.cpp b/cpp/cdc/CDCDB.cpp index ed917af0..bf381fc3 100644 --- a/cpp/cdc/CDCDB.cpp +++ b/cpp/cdc/CDCDB.cpp @@ -1,11 +1,13 @@ #include #include +#include #include #include #include #include #include #include +#include #include "Assert.hpp" #include "Bincode.hpp" @@ -15,6 +17,7 @@ #include "Env.hpp" #include "Exception.hpp" #include "Msgs.hpp" +#include "MsgsGen.hpp" #include "RocksDBUtils.hpp" #include "ShardDB.hpp" #include "Time.hpp" @@ -92,25 +95,48 @@ std::ostream& operator<<(std::ostream& out, const CDCShardReq& x) { return out; } +std::ostream& operator<<(std::ostream& out, const CDCFinished& x) { + out << "CDCFinished("; + if (x.err != NO_ERROR) { + out << "err=" << x.err; + } else { + out << "resp=" << x.resp; + } + out << ")"; + return out; +} + std::ostream& operator<<(std::ostream& out, const CDCStep& x) { - ALWAYS_ASSERT(!(x.txnFinished != 0 && x.txnNeedsShard != 0)); - out << "CDCStep("; - if (x.txnFinished != 0) { - out << "finishedTxn=" << x.txnFinished; - if (x.err != NO_ERROR) { - out << ", err=" << x.err; - } else { - out << ", resp=" << x.resp; - } - } - if (x.txnNeedsShard != 0) { - out << "txnNeedsShard=" << x.txnNeedsShard << ", shardReq=" << x.shardReq; - } - if (x.nextTxn != 0) { - if (x.txnFinished != 0 || x.txnNeedsShard != 0) { + out << "CDCStep(finishedTxns=["; + for (int i = 0; i < x.finishedTxns.size(); i++) { + if (i > 0) { out << ", "; } - out << "nextTxn=" << x.nextTxn; + const auto& txn = x.finishedTxns[i]; + out << "<" << txn.first << ", " << txn.second << ">"; + } + out << "], runningTxns=["; + for (int i = 0; i < x.runningTxns.size(); i++) { + if (i > 0) { + out << ", "; + } + const auto& txn = x.runningTxns[i]; + out << "<" << txn.first << ", " << txn.second << ">"; + } + out << "])"; + return out; +} + +std::ostream& operator<<(std::ostream& out, CDCTxnId id) { + return out << id.x; +} + +std::ostream& operator<<(std::ostream& out, const CDCShardResp& x) { + out << "CDCShardResp(txnId=" << x.txnId << ", "; + if (x.err == NO_ERROR) { + out << "resp=" << x.resp; + } else { + out << "err=" << x.err; } out << ")"; return out; @@ -122,26 +148,71 @@ inline bool createCurrentLockedEdgeRetry(EggsError err) { err == EggsError::MORE_RECENT_SNAPSHOT_EDGE || err == EggsError::MORE_RECENT_CURRENT_EDGE; } +static constexpr InodeId MOVE_DIRECTORY_LOCK = InodeId::FromU64Unchecked(1ull<<63); + +// These are all the directories where we'll lock edges given a request. +// These function _must be pure_! We call it repeatedly as if it's a property +// of the request more than a function. +// +// Technically every well form request will have distinct inode ids, but there +// are parts in the code where this function is called before we know that the +// request is valid. +static std::unordered_set directoriesNeedingLock(const CDCReqContainer& req) { + std::unordered_set toLock; + switch (req.kind()) { + case CDCMessageKind::MAKE_DIRECTORY: + toLock.emplace(req.getMakeDirectory().ownerId); + break; + case CDCMessageKind::RENAME_FILE: + toLock.emplace(req.getRenameFile().oldOwnerId); + toLock.emplace(req.getRenameFile().newOwnerId); + break; + case CDCMessageKind::SOFT_UNLINK_DIRECTORY: + // TODO I'm pretty sure the target id is fine, as + // in, does not need locking. + toLock.emplace(req.getSoftUnlinkDirectory().ownerId); + break; + case CDCMessageKind::RENAME_DIRECTORY: + toLock.emplace(req.getRenameDirectory().oldOwnerId); + toLock.emplace(req.getRenameDirectory().newOwnerId); + // Moving directories is special: it can introduce loops if we're not careful. + // Instead of trying to not create loops in the context of interleaved transactions, + // we instead only allow one move directory at a time. + toLock.emplace(MOVE_DIRECTORY_LOCK); + break; + case CDCMessageKind::HARD_UNLINK_DIRECTORY: + toLock.emplace(req.getHardUnlinkDirectory().dirId); + break; + case CDCMessageKind::CROSS_SHARD_HARD_UNLINK_FILE: + toLock.emplace(req.getCrossShardHardUnlinkFile().ownerId); + break; + case CDCMessageKind::ERROR: + throw EGGS_EXCEPTION("bad req type error"); + default: + throw EGGS_EXCEPTION("bad req type %s", (uint8_t)req.kind()); + } + return toLock; +} + struct StateMachineEnv { Env& env; - rocksdb::OptimisticTransactionDB* db; rocksdb::ColumnFamilyHandle* defaultCf; rocksdb::ColumnFamilyHandle* parentCf; rocksdb::Transaction& dbTxn; - EggsTime time; - uint64_t txnId; + CDCTxnId txnId; uint8_t txnStep; CDCStep& cdcStep; + bool finished; StateMachineEnv( - Env& env_, rocksdb::OptimisticTransactionDB* db_, rocksdb::ColumnFamilyHandle* defaultCf_, rocksdb::ColumnFamilyHandle* parentCf_, rocksdb::Transaction& dbTxn_, EggsTime time_, uint64_t txnId_, uint8_t step_, CDCStep& cdcStep_ + Env& env_, rocksdb::ColumnFamilyHandle* defaultCf_, rocksdb::ColumnFamilyHandle* parentCf_, rocksdb::Transaction& dbTxn_, CDCTxnId txnId_, uint8_t step_, CDCStep& cdcStep_ ): - env(env_), db(db_), defaultCf(defaultCf_), parentCf(parentCf_), dbTxn(dbTxn_), time(time_), txnId(txnId_), txnStep(step_), cdcStep(cdcStep_) + env(env_), defaultCf(defaultCf_), parentCf(parentCf_), dbTxn(dbTxn_), txnId(txnId_), txnStep(step_), cdcStep(cdcStep_), finished(false) {} - InodeId nextDirectoryId() { + InodeId nextDirectoryId(rocksdb::Transaction& dbTxn) { std::string v; - ROCKS_DB_CHECKED(db->Get({}, defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), &v)); + ROCKS_DB_CHECKED(dbTxn.Get({}, defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), &v)); ExternalValue nextId(v); InodeId id = nextId().id(); nextId().setId(InodeId::FromU64(id.u64 + 1)); @@ -151,24 +222,27 @@ struct StateMachineEnv { ShardReqContainer& needsShard(uint8_t step, ShardId shid, bool repeated) { txnStep = step; - cdcStep.txnFinished = 0; - cdcStep.txnNeedsShard = txnId; - cdcStep.shardReq.shid = shid; - cdcStep.shardReq.repeated = repeated; - return cdcStep.shardReq.req; + auto& running = cdcStep.runningTxns.emplace_back(); + running.first = txnId; + running.second.shid = shid; + running.second.repeated = repeated; + return running.second.req; } CDCRespContainer& finish() { - cdcStep.txnFinished = txnId; - cdcStep.err = NO_ERROR; - return cdcStep.resp; + this->finished = true; + auto& finished = cdcStep.finishedTxns.emplace_back(); + finished.first = txnId; + finished.second.err = NO_ERROR; + return finished.second.resp; } void finishWithError(EggsError err) { + this->finished = true; ALWAYS_ASSERT(err != NO_ERROR); - cdcStep.txnFinished = txnId; - cdcStep.err = err; - cdcStep.txnNeedsShard = 0; + auto& errored = cdcStep.finishedTxns.emplace_back(); + errored.first = txnId; + errored.second.err = err; } }; @@ -234,7 +308,7 @@ struct MakeDirectoryStateMachine { } void start() { - state.setDirId(env.nextDirectoryId()); + state.setDirId(env.nextDirectoryId(env.dbTxn)); lookup(); } @@ -1217,23 +1291,33 @@ struct UnpackCDCReq { struct CDCDBImpl { Env _env; - // TODO it would be good to store basically all of the metadata in memory, - // so that we'd just read from it, but this requires a bit of care when writing - // if we want to be able to not write the batch at the end. + // The reason why we insist in storing everything in RocksDB is that we can do + // everything in a single transaction, so it's easier to reason about atomic + // modifications. _dirsToTxnsCf for example would be much simpler as a + // simple unordered_map. + // + // It also has the nice advantage that we don't need to reconstruct the state + // when starting up, it's all already there. rocksdb::OptimisticTransactionDB* _db; rocksdb::ColumnFamilyHandle* _defaultCf; - rocksdb::ColumnFamilyHandle* _reqQueueCf; rocksdb::ColumnFamilyHandle* _parentCf; + rocksdb::ColumnFamilyHandle* _enqueuedCf; // V1, txnId -> CDC req, only for executing or waiting to be executed requests + rocksdb::ColumnFamilyHandle* _executingCf; // V1, txnId -> CDC state machine, for requests that are executing + // V1, data structure storing a dir to txn ids mapping: + // InodeId -> txnId -- sentinel telling us what the first txn in line is. If none, zero. + // we need the sentinel to skip over tombstones quickly. + // InodeId, txnId set with the queue + rocksdb::ColumnFamilyHandle* _dirsToTxnsCf; + // rocksdb::ColumnFamilyHandle* _dirsToTxnsCf; // V1, InodeId, txnId set + // legacy + rocksdb::ColumnFamilyHandle* _reqQueueCfLegacy; // V0, txnId -> CDC req, for all the requests (including historical) AssertiveLock _processLock; std::shared_ptr _dbStatistics; std::string _dbStatisticsFile; - // Used to deserialize into - CDCReqContainer _cdcReq; - // ---------------------------------------------------------------- // initialization @@ -1256,6 +1340,9 @@ struct CDCDBImpl { {rocksdb::kDefaultColumnFamilyName, {}}, {"reqQueue", {}}, {"parent", {}}, + {"enqueued", {}}, + {"executing", {}}, + {"dirsToTxns", {}}, }; std::vector familiesHandles; auto dbPath = path + "/db"; @@ -1266,8 +1353,11 @@ struct CDCDBImpl { ); ALWAYS_ASSERT(familiesDescriptors.size() == familiesHandles.size()); _defaultCf = familiesHandles[0]; - _reqQueueCf = familiesHandles[1]; + _reqQueueCfLegacy = familiesHandles[1]; _parentCf = familiesHandles[2]; + _enqueuedCf = familiesHandles[3]; + _executingCf = familiesHandles[4]; + _dirsToTxnsCf = familiesHandles[5]; _initDb(); } @@ -1276,8 +1366,11 @@ struct CDCDBImpl { LOG_INFO(_env, "destroying column families and closing database"); ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_defaultCf)); - ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_reqQueueCf)); + ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_reqQueueCfLegacy)); ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_parentCf)); + ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_enqueuedCf)); + ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_executingCf)); + ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_dirsToTxnsCf)); ROCKS_DB_CHECKED(_db->Close()); } @@ -1301,16 +1394,33 @@ struct CDCDBImpl { } TXN_ID_SETTER_GETTER(LAST_TXN_KEY, _lastTxn, _setLastTxn) - TXN_ID_SETTER_GETTER(FIRST_TXN_IN_QUEUE_KEY, _firstTxnInQueue, _setFirstTxnInQueue) - TXN_ID_SETTER_GETTER(LAST_TXN_IN_QUEUE_KEY, _lastTxnInQueue, _setLastTxnInQueue) - TXN_ID_SETTER_GETTER(EXECUTING_TXN_KEY, _executingTxn, _setExecutingTxn) + TXN_ID_SETTER_GETTER(EXECUTING_TXN_KEY, _executingTxnLegacy, _setExecutingTxnLegacy) #undef TXN_ID_SETTER_GETTER - void _initDb() { - const auto keyExists = [this](rocksdb::ColumnFamilyHandle* cf, const rocksdb::Slice& key) -> bool { + // returns -1 if the version key was not set + int64_t _version(rocksdb::Transaction& dbTxn) { + std::string value; + auto status = dbTxn.Get({}, _defaultCf, cdcMetadataKey(&VERSION_KEY), &value); + if (status.IsNotFound()) { + return -1; + } else { + ROCKS_DB_CHECKED(status); + ExternalValue vV(value); + return vV().u64(); + } + } + + void _setVersion(rocksdb::Transaction& dbTxn, uint64_t version) { + auto v = U64Value::Static(version); + ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&VERSION_KEY), v.toSlice())); + } + + void _initDbV0(rocksdb::Transaction& dbTxn) { + LOG_INFO(_env, "initializing V0 db"); + const auto keyExists = [&dbTxn](rocksdb::ColumnFamilyHandle* cf, const rocksdb::Slice& key) -> bool { std::string value; - auto status = _db->Get({}, cf, key, &value); + auto status = dbTxn.Get({}, cf, key, &value); if (status.IsNotFound()) { return false; } else { @@ -1322,15 +1432,15 @@ struct CDCDBImpl { if (!keyExists(_defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY))) { LOG_INFO(_env, "initializing next directory id"); auto id = InodeIdValue::Static(InodeId::FromU64(ROOT_DIR_INODE_ID.u64 + 1)); - ROCKS_DB_CHECKED(_db->Put({}, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), id.toSlice())); + ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), id.toSlice())); } - const auto initZeroValue = [this, &keyExists](const std::string& what, const CDCMetadataKey& key) { + const auto initZeroValue = [this, &keyExists, &dbTxn](const std::string& what, const CDCMetadataKey& key) { if (!keyExists(_defaultCf, cdcMetadataKey(&key))) { LOG_INFO(_env, "initializing %s", what); StaticValue v; v().setU64(0); - ROCKS_DB_CHECKED(_db->Put({}, cdcMetadataKey(&key), v.toSlice())); + ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&key), v.toSlice())); } }; @@ -1341,6 +1451,61 @@ struct CDCDBImpl { initZeroValue("last applied log index", LAST_APPLIED_LOG_ENTRY_KEY); } + void _initDbV1(rocksdb::Transaction& dbTxn) { + LOG_INFO(_env, "initializing V1 db"); + // Pick up the executing txn, if any, and move it to the executing CF. + // We preserve the executing txn to preserve integrity. + { + uint64_t executingTxn = _executingTxnLegacy(dbTxn); + if (executingTxn != 0) { + LOG_INFO(_env, "migrating txn %s", executingTxn); + // _reqQueueCf -> _enqueuedCf + auto txnK = CDCTxnIdKey::Static(CDCTxnId(executingTxn)); + std::string reqV; + ROCKS_DB_CHECKED(dbTxn.Get({}, _reqQueueCfLegacy, txnK.toSlice(), &reqV)); + CDCReqContainer req; + UnpackCDCReq ureq(req); + bincodeFromRocksValue(reqV, ureq); + ROCKS_DB_CHECKED(dbTxn.Put(_enqueuedCf, txnK.toSlice(), reqV)); + // EXECUTING_TXN_KEY -> _executingCf + std::string txnStateV; + ROCKS_DB_CHECKED(dbTxn.Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV)); + ROCKS_DB_CHECKED(dbTxn.Put(_executingCf, txnK.toSlice(), txnStateV)); + // Add to _dirsToTxnsCf, will lock since things are empty + _addToDirsToTxns(dbTxn, txnK().id(), req); + } + } + + // Throw away everything legacy. The clients will just retry. + ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&FIRST_TXN_IN_QUEUE_KEY))); + ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&LAST_TXN_IN_QUEUE_KEY))); + ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&EXECUTING_TXN_KEY))); + ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&EXECUTING_TXN_STATE_KEY))); + // We could delete the CF itself, but let's do everything in the same transaction + std::unique_ptr it(dbTxn.GetIterator({}, _reqQueueCfLegacy)); + for (it->Seek(""); it->Valid(); it->Next()) { + ROCKS_DB_CHECKED(dbTxn.Delete(_reqQueueCfLegacy, it->key())); + } + ROCKS_DB_CHECKED(it->status()); + } + + void _initDb() { + rocksdb::WriteOptions options; + options.sync = true; + std::unique_ptr dbTxn(_db->BeginTransaction(options)); + + if (_version(*dbTxn), -1) { + _initDbV0(*dbTxn); + _setVersion(*dbTxn, 0); + } + if (_version(*dbTxn) == 0) { + _initDbV1(*dbTxn); + _setVersion(*dbTxn, 1); + } + + commitTransaction(*dbTxn); + } + // ---------------------------------------------------------------- // retrying txns @@ -1375,15 +1540,22 @@ struct CDCDBImpl { // Processing // ---------------------------------------------------------------- - uint64_t _lastAppliedLogEntry() { + uint64_t _lastAppliedLogEntryDB() { std::string value; ROCKS_DB_CHECKED(_db->Get({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), &value)); ExternalValue v(value); return v().u64(); } + uint64_t _lastAppliedLogEntry(rocksdb::Transaction& dbTxn) { + std::string value; + ROCKS_DB_CHECKED(dbTxn.Get({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), &value)); + ExternalValue v(value); + return v().u64(); + } + void _advanceLastAppliedLogEntry(rocksdb::Transaction& dbTxn, uint64_t index) { - uint64_t oldIndex = _lastAppliedLogEntry(); + uint64_t oldIndex = _lastAppliedLogEntry(dbTxn); ALWAYS_ASSERT(oldIndex+1 == index, "old index is %s, expected %s, got %s", oldIndex, oldIndex+1, index); LOG_DEBUG(_env, "bumping log index from %s to %s", oldIndex, index); StaticValue v; @@ -1391,80 +1563,131 @@ struct CDCDBImpl { ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), v.toSlice())); } - // Pushes a new request onto the queue. - void _reqQueuePush(rocksdb::Transaction& dbTxn, uint64_t txnId, const CDCReqContainer& req) { - // Check metadata - uint64_t last = _lastTxnInQueue(dbTxn); - ALWAYS_ASSERT(last == 0 || txnId == last + 1); - // Write to queue - { - auto k = U64Key::Static(txnId); - std::string v = bincodeToRocksValue(PackCDCReq(req)); - ROCKS_DB_CHECKED(dbTxn.Put(_reqQueueCf, k.toSlice(), v)); + void _addToDirsToTxns(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) { + for (const auto dirId: directoriesNeedingLock(req)) { + LOG_DEBUG(_env, "adding dir %s for txn %s", dirId, txnId); + { + // into the set + StaticValue k; + k().setDirId(dirId); + k().setTxnId(txnId); + ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, k.toSlice(), "")); + } + { + // sentinel, if necessary + StaticValue k; + k().setDirId(dirId); + k().setSentinel(); + std::string v; + auto status = dbTxn.Get({}, _dirsToTxnsCf, k.toSlice(), &v); + if (status.IsNotFound()) { // we're the first ones here, add the sentinel + auto v = CDCTxnIdValue::Static(txnId); + ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, k.toSlice(), v.toSlice())); + } else { + ROCKS_DB_CHECKED(status); + } + } } - // Update metadata - if (last == 0) { - _setFirstTxnInQueue(dbTxn, txnId); - } - _setLastTxnInQueue(dbTxn, txnId); } - // Returns 0 if the queue was empty, otherwise what was popped - uint64_t _reqQueuePeek(rocksdb::Transaction& dbTxn, CDCReqContainer& req) { - uint64_t first = _firstTxnInQueue(dbTxn); - if (first == 0) { - return 0; + // Returns the txn ids that might be free to work now. Note that we don't + // know that for sure because they might not hold locks for all dirs. This + // function does not check that. + void _removeFromDirsToTxns(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req, std::vector& mightBeReady) { + for (const auto dirId: directoriesNeedingLock(req)) { + LOG_DEBUG(_env, "removing dir %s for txn %s", dirId, txnId); + StaticValue k; + k().setDirId(dirId); + k().setTxnId(txnId); + // Check if there's a next key in line. We know that we won't + // have to step over many deleted keys here because we go through the + // list in order, and we seek from the list element we've just deleted. + // It is however important to set the iteration upper bound as of to + // not spill over and possibly trip over deleted keys. + StaticValue upperBoundK; + upperBoundK().setDirId(dirId); + upperBoundK().setTxnId(CDCTxnId(~(uint64_t)0)); + rocksdb::Slice upperBoundSlice = upperBoundK.toSlice(); + rocksdb::ReadOptions itOptions; + itOptions.iterate_upper_bound = &upperBoundSlice; + std::unique_ptr it(dbTxn.GetIterator(itOptions, _dirsToTxnsCf)); + it->Seek(k.toSlice()); + // we must find the key here -- we're removing it. + ALWAYS_ASSERT(it->Valid()); + { + // Additional safety check: the key is what we expect. + auto foundKey = ExternalValue::FromSlice(it->key()); + ALWAYS_ASSERT(!foundKey().isSentinel() && foundKey().txnId() == txnId); + } + // now that we've done our checks, we can remove the key + ROCKS_DB_CHECKED(dbTxn.Delete(_dirsToTxnsCf, k.toSlice())); + // then we look for the next one, if there's anything, + // and overwrite/delete the sentinel + StaticValue sentinelK; + sentinelK().setDirId(dirId); + sentinelK().setSentinel(); + it->Next(); + if (it->Valid()) { // there's something, set the sentinel + auto nextK = ExternalValue::FromSlice(it->key()); + auto sentinelV = CDCTxnIdValue::Static(nextK().txnId()); + LOG_DEBUG(_env, "selected %s as next in line after finishing %s", nextK().txnId(), txnId); + mightBeReady.emplace_back(nextK().txnId()); + ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, sentinelK.toSlice(), sentinelV.toSlice())); + } else { // we were the last ones here, remove sentinel + ROCKS_DB_CHECKED(it->status()); + ROCKS_DB_CHECKED(dbTxn.Delete(_dirsToTxnsCf, sentinelK.toSlice())); + } } - // Read - { - auto k = U64Key::Static(first); + } + + // Check if we have a lock on all the directories that matter to the txn id. + // It is assumed that the txnId in question is already in _dirsToTxnsCf. + bool _isReadyToGo(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) { + for (const auto dirId: directoriesNeedingLock(req)) { + // the sentinel _must_ be present -- at the very least us! + StaticValue k; + k().setDirId(dirId); + k().setSentinel(); std::string v; - ROCKS_DB_CHECKED(dbTxn.Get({}, _reqQueueCf, k.toSlice(), &v)); - UnpackCDCReq ureq(req); - bincodeFromRocksValue(v, ureq); + ROCKS_DB_CHECKED(dbTxn.Get({}, _dirsToTxnsCf, k.toSlice(), &v)); + ExternalValue otherTxnId(v); + if (otherTxnId().id() != txnId) { + return false; + } } - return first; + return true; } - // Returns 0 if the queue was empty, otherwise what was popped - uint64_t _reqQueuePop(rocksdb::Transaction& dbTxn) { - uint64_t first = _firstTxnInQueue(dbTxn); - if (first == 0) { - LOG_DEBUG(_env, "txn queue empty, returning 0"); - return 0; + // Adds a request to the enqueued requests. Also adds it to dirsToTxns, which will implicitly + // acquire locks. + void _addToEnqueued(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) { + { + auto k = CDCTxnIdKey::Static(txnId); + std::string v = bincodeToRocksValue(PackCDCReq(req)); + ROCKS_DB_CHECKED(dbTxn.Put(_enqueuedCf, k.toSlice(), v)); } - // Update metadata - uint64_t last = _lastTxnInQueue(dbTxn); - if (first == last) { // empty queue - _setFirstTxnInQueue(dbTxn, 0); - _setLastTxnInQueue(dbTxn, 0); - } else { - _setFirstTxnInQueue(dbTxn, first+1); - } - LOG_DEBUG(_env, "popped txn %s from queue", first); - return first; + _addToDirsToTxns(dbTxn, txnId, req); } // Moves the state forward, filling in `step` appropriatedly, and writing // out the updated state. - // - // This will _not_ start a new transaction automatically if the old one - // finishes. It does return whether the txn was finished though. template typename V> - bool _advance( - EggsTime time, + void _advance( rocksdb::Transaction& dbTxn, - uint64_t txnId, + CDCTxnId txnId, const CDCReqContainer& req, // If `shardRespError` and `shardResp` are null, we're starting to execute. // Otherwise, (err == NO_ERROR) == (req != nullptr). EggsError shardRespError, const ShardRespContainer* shardResp, V& state, - CDCStep& step + CDCStep& step, + // to collect things that might be able to start now because + // we've finished doing other stuff + std::vector& txnIds ) { - LOG_DEBUG(_env, "advancing txn %s of kind %s", txnId, req.kind()); - StateMachineEnv sm(_env, _db, _defaultCf, _parentCf, dbTxn, time, txnId, state().step(), step); + LOG_DEBUG(_env, "advancing txn %s of kind %s with state", txnId, req.kind()); + StateMachineEnv sm(_env, _defaultCf, _parentCf, dbTxn, txnId, state().step(), step); switch (req.kind()) { case CDCMessageKind::MAKE_DIRECTORY: MakeDirectoryStateMachine(sm, req.getMakeDirectory(), state().getMakeDirectory()).resume(shardRespError, shardResp); @@ -1489,155 +1712,162 @@ struct CDCDBImpl { } state().setStep(sm.txnStep); - ALWAYS_ASSERT(step.txnFinished == 0 || step.txnFinished == txnId); - ALWAYS_ASSERT(step.txnNeedsShard == 0 || step.txnNeedsShard == txnId); - ALWAYS_ASSERT(step.txnFinished == txnId || step.txnNeedsShard == txnId); - if (step.txnFinished == txnId) { - // we're done, clear the transaction from the system - ALWAYS_ASSERT(_reqQueuePop(dbTxn) == txnId); - _setExecutingTxn(dbTxn, 0); - // not strictly necessary, it'll just be overwritten next time, but let's be tidy - ROCKS_DB_CHECKED(dbTxn.Delete(_defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY))); - // also, fill in whether we've got something next - step.nextTxn = _firstTxnInQueue(dbTxn); - return true; + if (sm.finished) { + // we finished immediately + LOG_DEBUG(_env, "txn %s with req %s finished", txnId, req); + _finishExecuting(dbTxn, txnId, req, txnIds); } else { - // we're _not_ done, write out the state - ROCKS_DB_CHECKED(dbTxn.Put(_defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), state.toSlice())); + // we still have something to do, persist + _setExecuting(dbTxn, txnId, state); + } + } + + bool _isExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId) { + auto k = CDCTxnIdKey::Static(txnId); + std::string v; + auto status = dbTxn.Get({}, _executingCf, k.toSlice(), &v); + if (status.IsNotFound()) { return false; } + ROCKS_DB_CHECKED(status); + return true; } - // Starts executing the next transaction in line, if possible. If it managed - // to start something, it immediately advances it as well (no point delaying - // that). Returns the txn id that was started, if any. - uint64_t _startExecuting(EggsTime time, rocksdb::Transaction& dbTxn, CDCStep& step, CDCStatus& status) { - uint64_t executingTxn = _executingTxn(dbTxn); - if (executingTxn != 0) { - LOG_DEBUG(_env, "another transaction %s is already executing, can't start", executingTxn); - return 0; - } - - uint64_t txnToExecute = _reqQueuePeek(dbTxn, _cdcReq); - if (txnToExecute == 0) { - LOG_DEBUG(_env, "no transactions in queue found, can't start"); - return 0; - } - - _setExecutingTxn(dbTxn, txnToExecute); - LOG_DEBUG(_env, "starting to execute txn %s with req %s", txnToExecute, _cdcReq); - StaticValue txnState; - txnState().start(_cdcReq.kind()); - bool stillRunning = _advance(time, dbTxn, txnToExecute, _cdcReq, NO_ERROR, nullptr, txnState, step); - - if (stillRunning) { - status.runningTxn = txnToExecute; - status.runningTxnKind = _cdcReq.kind(); - } - - return txnToExecute; + template typename V> + void _setExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId, V& state) { + auto k = CDCTxnIdKey::Static(txnId); + ROCKS_DB_CHECKED(dbTxn.Put(_executingCf, k.toSlice(), state.toSlice())); } - uint64_t processCDCReq( - bool sync, - EggsTime time, - uint64_t logIndex, - const CDCReqContainer& req, - CDCStep& step, - CDCStatus& status - ) { - status.reset(); - - auto locked = _processLock.lock(); - - rocksdb::WriteOptions options; - options.sync = sync; - std::unique_ptr dbTxn(_db->BeginTransaction(options)); - - step.clear(); - - _advanceLastAppliedLogEntry(*dbTxn, logIndex); - - // Enqueue req - uint64_t txnId; + void _finishExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req, std::vector& txnIds) { { - // Generate new txn id - txnId = _lastTxn(*dbTxn) + 1; - _setLastTxn(*dbTxn, txnId); - // Push to queue - _reqQueuePush(*dbTxn, txnId, req); - LOG_DEBUG(_env, "enqueued CDC req %s with txn id %s", req.kind(), txnId); + // delete from _executingCf + auto k = CDCTxnIdKey::Static(txnId); + ROCKS_DB_CHECKED(dbTxn.Delete(_executingCf, k.toSlice())); } - - // Start executing, if we can - _startExecuting(time, *dbTxn, step, status); - - LOG_DEBUG(_env, "committing transaction"); - commitTransaction(*dbTxn); - - return txnId; + // delete from dirsToTxnIds + _removeFromDirsToTxns(dbTxn, txnId, req, txnIds); } - void processShardResp( - bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already. - EggsTime time, - uint64_t logIndex, - EggsError respError, + // Starts executing the given transactions, if possible. If it managed + // to start something, it immediately advances it as well (no point delaying + // that). + // + // It modifies `txnIds` with new transactions we looked at if we immediately + // finish executing txns that we start here. + void _startExecuting(rocksdb::Transaction& dbTxn, std::vector& txnIds, CDCStep& step) { + CDCReqContainer req; + for (int i = 0; i < txnIds.size(); i++) { + CDCTxnId txnId = txnIds[i]; + auto reqK = CDCTxnIdKey::Static(txnId); + std::string reqV; + ROCKS_DB_CHECKED(dbTxn.Get({}, _enqueuedCf, reqK.toSlice(), &reqV)); + UnpackCDCReq ureq(req); + bincodeFromRocksValue(reqV, ureq); + if (!_isExecuting(dbTxn, txnId)) { + if (_isReadyToGo(dbTxn, txnId, req)) { + LOG_DEBUG(_env, "starting to execute txn %s with req %s, since it is ready to go and not executing already", txnId, req); + StaticValue txnState; + txnState().start(req.kind()); + _setExecuting(dbTxn, txnId, txnState); + _advance(dbTxn, txnId, req, NO_ERROR, nullptr, txnState, step, txnIds); + } else { + LOG_DEBUG(_env, "waiting before executing txn %s with req %s, since it is not ready to go", txnId, req); + } + } + } + } + + void _enqueueCDCReqs( + rocksdb::Transaction& dbTxn, + const std::vector& reqs, + CDCStep& step, + // we need two lists because one (`cdcReqsTxnIds`) is specifically + // to return to the user, while the other is used for other purposes, + // too. + std::vector& txnIdsToStart, + std::vector& cdcReqsTxnIds + ) { + for (const auto& req: reqs) { + uint64_t txnId; + { + // Generate new txn id + txnId = _lastTxn(dbTxn) + 1; + _setLastTxn(dbTxn, txnId); + // Push to queue + _addToEnqueued(dbTxn, txnId, req); + LOG_DEBUG(_env, "enqueued CDC req %s with txn id %s", req.kind(), txnId); + } + + txnIdsToStart.emplace_back(txnId); + cdcReqsTxnIds.emplace_back(txnId); + } + } + + void _advanceWithResp( + rocksdb::Transaction& dbTxn, + CDCTxnId txnId, + EggsError err, const ShardRespContainer* resp, CDCStep& step, - CDCStatus& status + std::vector& txnIdsToStart ) { - status.reset(); - - auto locked = _processLock.lock(); - - rocksdb::WriteOptions options; - options.sync = sync; - std::unique_ptr dbTxn(_db->BeginTransaction(options)); - - step.clear(); - - _advanceLastAppliedLogEntry(*dbTxn, logIndex); - - // Find the txn - uint64_t txnId = _executingTxn(*dbTxn); - ALWAYS_ASSERT(txnId != 0); + auto txnIdK = CDCTxnIdKey::Static(txnId); // Get the req + CDCReqContainer cdcReq; { - auto k = U64Key::Static(txnId); std::string reqV; - ROCKS_DB_CHECKED(dbTxn->Get({}, _reqQueueCf, k.toSlice(), &reqV)); - UnpackCDCReq ureq(_cdcReq); + ROCKS_DB_CHECKED(dbTxn.Get({}, _enqueuedCf, txnIdK.toSlice(), &reqV)); + UnpackCDCReq ureq(cdcReq); bincodeFromRocksValue(reqV, ureq); } // Get the state std::string txnStateV; - ROCKS_DB_CHECKED(dbTxn->Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV)); + ROCKS_DB_CHECKED(dbTxn.Get({}, _executingCf, txnIdK.toSlice(), &txnStateV)); ExternalValue txnState(txnStateV); - // Advance - bool stillRunning = _advance(time, *dbTxn, txnId, _cdcReq, respError, resp, txnState, step); + // Advance with response + _advance(dbTxn, txnId, cdcReq, err, resp, txnState, step, txnIdsToStart); + } - if (stillRunning) { - // Store status - status.runningTxn = txnId; - status.runningTxnKind = _cdcReq.kind(); + void update( + bool sync, + uint64_t logIndex, + const std::vector& cdcReqs, + const std::vector& shardResps, + CDCStep& step, + std::vector& cdcReqsTxnIds + ) { + auto locked = _processLock.lock(); + + rocksdb::WriteOptions options; + options.sync = sync; + std::unique_ptr dbTxn(_db->BeginTransaction(options)); + + _advanceLastAppliedLogEntry(*dbTxn, logIndex); + + step.clear(); + cdcReqsTxnIds.clear(); + + { + std::vector txnIdsToStart; + _enqueueCDCReqs(*dbTxn, cdcReqs, step, txnIdsToStart, cdcReqsTxnIds); + for (const auto& resp: shardResps) { + _advanceWithResp(*dbTxn, resp.txnId, resp.err, &resp.resp, step, txnIdsToStart); + } + _startExecuting(*dbTxn, txnIdsToStart, step); } commitTransaction(*dbTxn); } - void startNextTransaction( - bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already. - EggsTime time, + void bootstrap( + bool sync, uint64_t logIndex, - CDCStep& step, - CDCStatus& status + CDCStep& step ) { - status.reset(); - auto locked = _processLock.lock(); rocksdb::WriteOptions options; @@ -1647,31 +1877,17 @@ struct CDCDBImpl { step.clear(); _advanceLastAppliedLogEntry(*dbTxn, logIndex); - uint64_t txnId = _startExecuting(time, *dbTxn, step, status); - if (txnId == 0) { - // no txn could be started, see if one is executing already to fill in the `step` - txnId = _executingTxn(*dbTxn); - if (txnId != 0) { - LOG_DEBUG(_env, "transaction %s is already executing, will resume it", txnId); - // Get the req - { - auto k = U64Key::Static(txnId); - std::string reqV; - ROCKS_DB_CHECKED(dbTxn->Get({}, _reqQueueCf, k.toSlice(), &reqV)); - UnpackCDCReq ureq(_cdcReq); - bincodeFromRocksValue(reqV, ureq); - } - // Store status - status.runningTxn = txnId; - status.runningTxnKind = _cdcReq.kind(); - // Get the state - std::string txnStateV; - ROCKS_DB_CHECKED(dbTxn->Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV)); - ExternalValue txnState(txnStateV); - // Advance - _advance(time, *dbTxn, txnId, _cdcReq, NO_ERROR, nullptr, txnState, step); - } + + std::vector txnIdsToStart; + // Just collect all executing txns, and run them + std::unique_ptr it(dbTxn->GetIterator({}, _executingCf)); + for (it->Seek(""); it->Valid(); it->Next()) { + auto txnIdK = ExternalValue::FromSlice(it->key()); + _advanceWithResp(*dbTxn, txnIdK().id(), NO_ERROR, nullptr, step, txnIdsToStart); } + ROCKS_DB_CHECKED(it->status()); + + _startExecuting(*dbTxn, txnIdsToStart, step); commitTransaction(*dbTxn); } @@ -1699,20 +1915,16 @@ CDCDB::~CDCDB() { delete ((CDCDBImpl*)_impl); } -uint64_t CDCDB::processCDCReq(bool sync, EggsTime time, uint64_t logIndex, const CDCReqContainer& req, CDCStep& step, CDCStatus& status) { - return ((CDCDBImpl*)_impl)->processCDCReq(sync, time, logIndex, req, step, status); +void CDCDB::bootstrap(bool sync, uint64_t logIndex, CDCStep& step) { + return ((CDCDBImpl*)_impl)->bootstrap(sync, logIndex, step); } -void CDCDB::processShardResp(bool sync, EggsTime time, uint64_t logIndex, EggsError respError, const ShardRespContainer* resp, CDCStep& step, CDCStatus& status) { - return ((CDCDBImpl*)_impl)->processShardResp(sync, time, logIndex, respError, resp, step, status); -} - -void CDCDB::startNextTransaction(bool sync, EggsTime time, uint64_t logIndex, CDCStep& step, CDCStatus& status) { - return ((CDCDBImpl*)_impl)->startNextTransaction(sync, time, logIndex, step, status); +void CDCDB::update(bool sync, uint64_t logIndex, const std::vector& cdcReqs, const std::vector& shardResps, CDCStep& step, std::vector& cdcReqsTxnIds) { + return ((CDCDBImpl*)_impl)->update(sync, logIndex, cdcReqs, shardResps, step, cdcReqsTxnIds); } uint64_t CDCDB::lastAppliedLogEntry() { - return ((CDCDBImpl*)_impl)->_lastAppliedLogEntry(); + return ((CDCDBImpl*)_impl)->_lastAppliedLogEntryDB(); } void CDCDB::rocksDBMetrics(std::unordered_map& values) { diff --git a/cpp/cdc/CDCDB.hpp b/cpp/cdc/CDCDB.hpp index 5268bb03..e0ed65c4 100644 --- a/cpp/cdc/CDCDB.hpp +++ b/cpp/cdc/CDCDB.hpp @@ -1,12 +1,38 @@ #pragma once -#include "unordered_map" +#include +#include #include "Bincode.hpp" #include "Msgs.hpp" #include "Env.hpp" #include "Shuckle.hpp" +// This exists purely for type safety +struct CDCTxnId { + uint64_t x; + + CDCTxnId() : x(0) {} // txn ids are never zeros + CDCTxnId(uint64_t x_) : x(x_) {} + + bool operator==(const CDCTxnId rhs) const { + return x == rhs.x; + } + + bool operator!=(const CDCTxnId rhs) const { + return x != rhs.x; + } +}; + +std::ostream& operator<<(std::ostream& out, CDCTxnId id); + +template <> +struct std::hash { + std::size_t operator()(const CDCTxnId key) const { + return std::hash{}(key.x); + } +}; + struct CDCShardReq { ShardId shid; bool repeated; // This request is exactly the same as the previous one. @@ -21,49 +47,35 @@ struct CDCShardReq { std::ostream& operator<<(std::ostream& out, const CDCShardReq& x); -struct CDCStep { - // If non-zero, a transaction has just finished, and here we have - // the response (whether an error or a response). - uint64_t txnFinished; - EggsError err; // if NO_ERROR, resp is contains the response. +struct CDCFinished { + // If err is not NO_ERROR, resp is not filled in + EggsError err; CDCRespContainer resp; - - // If non-zero, a transaction is running, but we need something - // from a shard to have it proceed. - // - // We have !((finishedTxn != 0) && (txnNeedsShard != 0)) as an invariant - // -- we can't have finished and be running a thing in the same step. - uint64_t txnNeedsShard; - CDCShardReq shardReq; - - // If non-zero, there is a transaction after the current one waiting - // to be executed. Only filled in if `txnNeedsShard == 0`. - // Useful to decide when to call `startNextTransaction` (although - // calling it is safe in any case). - uint64_t nextTxn; - - void clear() { - txnFinished = 0; - txnNeedsShard = 0; - nextTxn = 0; - } }; -// Only used for timing purposes, there is some overlap with CDCStep, -// but we separate it out for code robustness (we just get this info -// every time we touch the db). -struct CDCStatus { - uint64_t runningTxn; // 0 if nothing - CDCMessageKind runningTxnKind; // only relevant if it's running +std::ostream& operator<<(std::ostream& out, const CDCFinished& x); - void reset() { - runningTxn = 0; - runningTxnKind = (CDCMessageKind)0; +struct CDCStep { + std::vector> finishedTxns; // txns which have finished + std::vector> runningTxns; // txns which need a new shard request + + void clear() { + finishedTxns.clear(); + runningTxns.clear(); } }; std::ostream& operator<<(std::ostream& out, const CDCStep& x); +struct CDCShardResp { + CDCTxnId txnId; // the transaction id we're getting a response for + // if err != NO_ERROR, resp is not filled in + EggsError err; + ShardRespContainer resp; +}; + +std::ostream& operator<<(std::ostream& out, const CDCShardResp& x); + struct CDCDB { private: void* _impl; @@ -81,52 +93,30 @@ public: // responses. // // The functions below cannot be called concurrently. - // - // TODO one thing that we'd like to do (outside the deterministic state - // machine) is apply backpressure when too many txns are enqueued. It's - // not good to do it inside the state machine, because then we'd be marrying - // the deterministic state update function to such heuristics, which seems - // imprudent. So we'd like some function returning the length of the queue. - // Enqueues a cdc request, and immediately starts it if the system is currently - // idle. Returns the txn id that got assigned to the txn. - uint64_t processCDCReq( - bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already. - EggsTime time, + // Gives you what to do when the CDC is started back up. It'll basically just + // tell you to send some requests to shards. You need to run this when starting. + void bootstrap( + bool sync, uint64_t logIndex, - const CDCReqContainer& req, - CDCStep& step, - CDCStatus& status + CDCStep& step ); - // Advances the CDC state using the given shard response. + // Enqueues some CDC requests, and immediately starts it if possible. + // Returns the txn id that was assigned to each request. // - // This function crashes hard if the caller passes it a response it's not expecting. So the caller - // should track responses and make sure only the correct one is passed in. - void processShardResp( - bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already. - EggsTime time, - uint64_t logIndex, - // (err == NO_ERROR) == (req != nullptr) - EggsError err, - const ShardRespContainer* req, - CDCStep& step, - CDCStatus& status - ); - - // Does what it can to advance the state of the system, by starting the next - // transaction in line (if any). In any case, it returns what there is to do next - // in `step`. + // Also, advances the CDC state using some shard responses. // - // It's fine to call this function even if there's nothing to do -- and in fact - // you should do that when starting up the CDC, to make sure to finish - // in-flight CDC transactions. - void startNextTransaction( - bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already. - EggsTime time, + // This function crashes hard if the caller passes it a response it's not + // expecting. So the caller should track responses and make sure only relevant + // ones are passed in. + void update( + bool sync, uint64_t logIndex, + const std::vector& cdcReqs, + const std::vector& shardResps, CDCStep& step, - CDCStatus& status + std::vector& cdcReqsTxnIds // output txn ids for all the requests ); // The index of the last log entry persisted to the DB diff --git a/cpp/cdc/CDCDBData.hpp b/cpp/cdc/CDCDBData.hpp index f242dbf1..35ea360b 100644 --- a/cpp/cdc/CDCDBData.hpp +++ b/cpp/cdc/CDCDBData.hpp @@ -11,6 +11,7 @@ #include "RocksDBUtils.hpp" #include "Msgs.hpp" #include "Time.hpp" +#include "CDCDB.hpp" enum class CDCMetadataKey : uint8_t { LAST_APPLIED_LOG_ENTRY = 0, @@ -20,19 +21,92 @@ enum class CDCMetadataKey : uint8_t { EXECUTING_TXN = 4, EXECUTING_TXN_STATE = 6, LAST_DIRECTORY_ID = 7, + VERSION = 8, }; -constexpr CDCMetadataKey LAST_APPLIED_LOG_ENTRY_KEY = CDCMetadataKey::LAST_APPLIED_LOG_ENTRY; -constexpr CDCMetadataKey LAST_TXN_KEY = CDCMetadataKey::LAST_TXN; -constexpr CDCMetadataKey FIRST_TXN_IN_QUEUE_KEY = CDCMetadataKey::FIRST_TXN_IN_QUEUE; -constexpr CDCMetadataKey LAST_TXN_IN_QUEUE_KEY = CDCMetadataKey::LAST_TXN_IN_QUEUE; -constexpr CDCMetadataKey EXECUTING_TXN_KEY = CDCMetadataKey::EXECUTING_TXN; -constexpr CDCMetadataKey EXECUTING_TXN_STATE_KEY = CDCMetadataKey::EXECUTING_TXN_STATE; -constexpr CDCMetadataKey NEXT_DIRECTORY_ID_KEY = CDCMetadataKey::LAST_DIRECTORY_ID; +constexpr CDCMetadataKey LAST_APPLIED_LOG_ENTRY_KEY = CDCMetadataKey::LAST_APPLIED_LOG_ENTRY; // V0, V1 +constexpr CDCMetadataKey LAST_TXN_KEY = CDCMetadataKey::LAST_TXN; // V0, V1 +constexpr CDCMetadataKey FIRST_TXN_IN_QUEUE_KEY = CDCMetadataKey::FIRST_TXN_IN_QUEUE; // V0 +constexpr CDCMetadataKey LAST_TXN_IN_QUEUE_KEY = CDCMetadataKey::LAST_TXN_IN_QUEUE; // V0 +constexpr CDCMetadataKey EXECUTING_TXN_KEY = CDCMetadataKey::EXECUTING_TXN; // V0 +constexpr CDCMetadataKey EXECUTING_TXN_STATE_KEY = CDCMetadataKey::EXECUTING_TXN_STATE; // V0 +constexpr CDCMetadataKey NEXT_DIRECTORY_ID_KEY = CDCMetadataKey::LAST_DIRECTORY_ID; // V0, V1 +constexpr CDCMetadataKey VERSION_KEY = CDCMetadataKey::VERSION; // V1 inline rocksdb::Slice cdcMetadataKey(const CDCMetadataKey* k) { return rocksdb::Slice((const char*)k, sizeof(*k)); } +struct CDCTxnIdKey { + FIELDS( + BE, CDCTxnId, id, setIdUnchecked, + END_STATIC + ) + + void setId(CDCTxnId i) { + ALWAYS_ASSERT(i.x != 0); + setIdUnchecked(i); + } + + static StaticValue Static(CDCTxnId id) { + auto x = StaticValue(); + x().setId(id); + return x; + } +}; + +struct CDCTxnIdValue { + FIELDS( + LE, CDCTxnId, id, setIdUnchecked, + END_STATIC + ) + + void setId(CDCTxnId i) { + ALWAYS_ASSERT(i.x != 0); + setIdUnchecked(i); + } + + static StaticValue Static(CDCTxnId id) { + auto x = StaticValue(); + x().setId(id); + return x; + } +}; + +// This data structure, conceptually, is std::unordered_map>. +// +// To encode this in RocksDB, we store (InodeId, CDCTxnId) keys with no values. Txn ids are +// increasing so the order will be naturally what we want. +// +// We also store a sentinel with the head of the list. This is to avoid having to step on many +// deleted keys when checking if a dir is already locked. +// +// The functions adding/removing elements to the list are tasked with bookeeping the sentinel. +struct DirsToTxnsKey { + FIELDS( + BE, InodeId, dirId, setDirId, + BE, uint64_t, maybeTxnId, setMaybeTxnId, // if 0, it's a sentinel + END_STATIC + ) + + bool isSentinel() const { + return maybeTxnId() == 0; + } + + CDCTxnId txnId() const { + ALWAYS_ASSERT(maybeTxnId() != 0); + return maybeTxnId(); + } + + void setTxnId(CDCTxnId txnId) { + ALWAYS_ASSERT(txnId.x != 0); + setMaybeTxnId(txnId.x); + } + + void setSentinel() { + setMaybeTxnId(0); + } +}; + struct MakeDirectoryState { FIELDS( LE, InodeId, dirId, setDirId, diff --git a/cpp/wyhash/wyhash.h b/cpp/wyhash/wyhash.h index 84646ff3..52057fdb 100644 --- a/cpp/wyhash/wyhash.h +++ b/cpp/wyhash/wyhash.h @@ -7,6 +7,11 @@ #include #include +#include +#include +#include +#include +#include #ifdef __cplusplus extern "C" { @@ -47,6 +52,31 @@ static void wyhash64_bytes(uint64_t* state, uint8_t* bytes, size_t len) { } } +// Not specific to wyhash, just handy +__attribute__((unused)) +static uint64_t wyhash64_rand() { + int fd = open("/dev/urandom", O_RDONLY); + if (fd < 0) { + fprintf(stderr, "could not open /dev/urandom: %d", errno); + exit(1); + } + uint64_t x; + ssize_t r = read(fd, &x, sizeof(x)); + if (r < 0) { + fprintf(stderr, "could read /dev/urandom: %d", errno); + exit(1); + } + if (r != sizeof(x)) { + fprintf(stderr, "expected %ld bytes from /dev/urandom, got %ld", sizeof(x), r); + exit(1); + } + if (close(fd) < 0) { + fprintf(stderr, "could not close /dev/urandom: %d", errno); + exit(1); + } + return x; +} + #ifdef __cplusplus } #endif diff --git a/go/eggstests/eggstests.go b/go/eggstests/eggstests.go index 2797f5f8..32efd1dc 100644 --- a/go/eggstests/eggstests.go +++ b/go/eggstests/eggstests.go @@ -302,6 +302,15 @@ func (r *RunTests) run( }, ) + r.test( + log, + "parallel dirs", + "", + func(counters *lib.ClientCounters) { + parallelDirsTest(log, r.shuckleAddress(), counters) + }, + ) + largeFileOpts := largeFileTestOpts{ fileSize: 1 << 30, // 1GiB } @@ -314,15 +323,6 @@ func (r *RunTests) run( }, ) - r.test( - log, - "parallel dirs", - "", - func(counters *lib.ClientCounters) { - parallelDirsTest(log, r.shuckleAddress(), counters) - }, - ) - rsyncOpts := rsyncTestOpts{ maxFileSize: 200 << 20, // 200MiB numFiles: 100, // 20GiB diff --git a/go/lib/client.go b/go/lib/client.go index 602159f7..e474ca25 100644 --- a/go/lib/client.go +++ b/go/lib/client.go @@ -936,6 +936,7 @@ func NewClientDirectNoAddrs( log *Logger, ) (c *Client, err error) { c = &Client{ + // do not catch requests from previous executions requestIdCounter: rand.Uint64(), fetchBlockBufs: sync.Pool{ New: func() any {