// Copyright 2025 XTX Markets Technologies Limited // // SPDX-License-Identifier: GPL-2.0-or-later #include "CDCDB.hpp" #include #include #include #include #include #include #include #include #include #include #include "Assert.hpp" #include "AssertiveLock.hpp" #include "CDCDBData.hpp" #include "Common.hpp" #include "Env.hpp" #include "Exception.hpp" #include "MsgsGen.hpp" #include "RocksDBUtils.hpp" #include "ShardDB.hpp" #include "Time.hpp" #include "XmonAgent.hpp" // The CDC needs to remember about multi-step transactions which it executes by // talking to the shards. So essentially we need to store a bunch of queued // queued requests, and a bunch of currently executing transactions, which are // waiting for shard responses. // // Generally speaking we need to make sure to not have transactions step on each // others toes. This is especially pertinent for edge locking logic -- given our // requirements for idempotency (we don't know when a request has gone through) // when we lock an edge we're really locking it so that shard-specific operations // don't mess things up, rather than other CDC operations. // // The strategy that we adopt to achieve that is as follows: // // 1. Each request that comes in gets assigned a txn id. The txn ids are sequential: // later request, higher txn ids. // 2. We specify a set of directory inode ids that a given request will need to lock // edges on. // 3. We store a map InodeId -> []TxnId. The list of TxnIds is ordered. Each TxnId // is in the list value for all the InodeId keys it needs locks for. Only TxnIds // that have not completed yet are in this map. // 4. A txn does not start executing unless it is at the head of all the lists in the // map above. // // This scheme ensures no deadlocks and also fairness (no transaction can ever be // starved). Divide the requests into the equivalence classes generated by the // relation "have directories to be locked in common". The scheme above guarantees // that each class will be run serially, with transactions with earlier txn ids // being run before txns with later txn ids. // // The "move directory" request is a bit of an exception. It needs to do a "no loops" // check which depends on the entire directory tree. So for simplicity we arrange // for each move directory request to run serially, by creating a dummy InodeId // which represents the "move directory lock", so to speak. // // The first version of the CDC just executed each transaction serially, but this // proved way too slow, capping directory creation at around 200req/s. So we did the // above to pipeline CDC requests. Since this required a change to the CDC RocksDB // schema, we call the first version of the schema V0, the current one V1. std::vector CDCDB::getColumnFamilyDescriptors() { return { {rocksdb::kDefaultColumnFamilyName, {}}, {"reqQueue", {}}, {"parent", {}}, {"enqueued", {}}, {"executing", {}}, {"dirsToTxns", {}}, }; } std::ostream& operator<<(std::ostream& out, const CDCShardReq& x) { out << "CDCShardReq(shid=" << x.shid << ", req=" << x.req << ")"; return out; } std::ostream& operator<<(std::ostream& out, const CDCStep& x) { out << "CDCStep(finishedTxns=["; for (int i = 0; i < x.finishedTxns.size(); i++) { if (i > 0) { out << ", "; } const auto& txn = x.finishedTxns[i]; out << "<" << txn.first << ", " << txn.second << ">"; } out << "], runningTxns=["; for (int i = 0; i < x.runningTxns.size(); i++) { if (i > 0) { out << ", "; } const auto& txn = x.runningTxns[i]; out << "<" << txn.first << ", " << txn.second << ">"; } out << "])"; return out; } std::ostream& operator<<(std::ostream& out, CDCTxnId id) { return out << id.x; } std::ostream& operator<<(std::ostream& out, const CDCShardResp& x) { return out << "CDCShardResp(txnId=" << x.txnId << ", resp=" << x.resp << ")"; } std::ostream& operator<<(std::ostream& out, const CDCLogEntry& x) { out << "CDCLogEntry(logIdx= " << x.logIdx() << ", "; out << "cdcReqs=["; for (auto& req : x.cdcReqs()) { out << req << ", "; } out << "], shardResps=["; for (auto& resp: x.shardResps()) { out << resp << ", "; } out << "])"; return out; } inline bool createCurrentLockedEdgeRetry(TernError err) { return err == TernError::TIMEOUT || err == TernError::MTIME_IS_TOO_RECENT || err == TernError::MORE_RECENT_SNAPSHOT_EDGE || err == TernError::MORE_RECENT_CURRENT_EDGE; } static constexpr InodeId MOVE_DIRECTORY_LOCK = InodeId::FromU64Unchecked(1ull<<63); struct DirectoriesNeedingLock { private: static constexpr int MAX_SIZE = 3; std::array _ids; int _size; public: DirectoriesNeedingLock() : _size(0) { memset(_ids.data(), 0, _ids.size()*sizeof(decltype(_ids)::value_type)); } int size() const { return _size; } decltype(_ids)::const_iterator begin() const { return _ids.begin(); } decltype(_ids)::const_iterator end() const { return _ids.begin() + _size; } void add(InodeId id) { ALWAYS_ASSERT(_size != MAX_SIZE); ALWAYS_ASSERT(id != InodeId::FromU64Unchecked(0)); for (InodeId otherId : _ids) { if (otherId == id) { return; } } _ids[_size] = id; _size++; } }; // These are all the directories where we'll lock edges given a request. // These function _must be pure_! We call it repeatedly as if it's a property // of the request more than a function. // // Technically every well formed request will have distinct inode ids, but there // are parts in the code where this function is called before we know that the // request is valid. Hence the set semantics of DirectoriesNeedingLock. static DirectoriesNeedingLock directoriesNeedingLock(const CDCReqContainer& req) { DirectoriesNeedingLock toLock; switch (req.kind()) { case CDCMessageKind::MAKE_DIRECTORY: toLock.add(req.getMakeDirectory().ownerId); break; case CDCMessageKind::RENAME_FILE: toLock.add(req.getRenameFile().oldOwnerId); toLock.add(req.getRenameFile().newOwnerId); break; case CDCMessageKind::SOFT_UNLINK_DIRECTORY: // Lock needs to be acquired on both owner and target directory. // Lock on owner is needed for remove owner step of the state machine. // Lock on target is needed so that the target does not get removed by GC after // we unlink the owner but before we unlock the edge. toLock.add(req.getSoftUnlinkDirectory().ownerId); toLock.add(req.getSoftUnlinkDirectory().targetId); break; case CDCMessageKind::RENAME_DIRECTORY: toLock.add(req.getRenameDirectory().oldOwnerId); toLock.add(req.getRenameDirectory().newOwnerId); // Moving directories is special: it can introduce loops if we're not careful. // Instead of trying to not create loops in the context of interleaved transactions, // we instead only allow one move directory at a time. toLock.add(MOVE_DIRECTORY_LOCK); break; case CDCMessageKind::HARD_UNLINK_DIRECTORY: toLock.add(req.getHardUnlinkDirectory().dirId); break; case CDCMessageKind::CROSS_SHARD_HARD_UNLINK_FILE: toLock.add(req.getCrossShardHardUnlinkFile().ownerId); break; case CDCMessageKind::ERROR: throw TERN_EXCEPTION("bad req type error"); default: throw TERN_EXCEPTION("bad req type %s", (uint8_t)req.kind()); } return toLock; } struct StateMachineEnv { Env& env; rocksdb::ColumnFamilyHandle* defaultCf; rocksdb::ColumnFamilyHandle* parentCf; rocksdb::Transaction& dbTxn; CDCTxnId txnId; uint8_t txnStep; CDCStep& cdcStep; bool finished; StateMachineEnv( Env& env_, rocksdb::ColumnFamilyHandle* defaultCf_, rocksdb::ColumnFamilyHandle* parentCf_, rocksdb::Transaction& dbTxn_, CDCTxnId txnId_, uint8_t step_, CDCStep& cdcStep_ ): env(env_), defaultCf(defaultCf_), parentCf(parentCf_), dbTxn(dbTxn_), txnId(txnId_), txnStep(step_), cdcStep(cdcStep_), finished(false) {} InodeId nextDirectoryId(rocksdb::Transaction& dbTxn) { std::string v; ROCKS_DB_CHECKED(dbTxn.Get({}, defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), &v)); ExternalValue nextId(v); InodeId id = nextId().id(); nextId().setId(InodeId::FromU64(id.u64 + 1)); ROCKS_DB_CHECKED(dbTxn.Put(defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), nextId.toSlice())); return id; } ShardReqContainer& needsShard(uint8_t step, ShardId shid, bool repeated) { txnStep = step; auto& running = cdcStep.runningTxns.emplace_back(); running.first = txnId; running.second.shid = shid; running.second.repeated = repeated; return running.second.req; } CDCRespContainer& finish() { this->finished = true; auto& finished = cdcStep.finishedTxns.emplace_back(); finished.first = txnId; return finished.second; } void finishWithError(TernError err) { this->finished = true; ALWAYS_ASSERT(err != TernError::NO_ERROR); auto& errored = cdcStep.finishedTxns.emplace_back(); errored.first = txnId; errored.second.setError() = err; } }; constexpr uint8_t TXN_START = 0; enum MakeDirectoryStep : uint8_t { MAKE_DIRECTORY_LOOKUP = 1, MAKE_DIRECTORY_CREATE_DIR = 2, MAKE_DIRECTORY_LOOKUP_OLD_CREATION_TIME = 3, MAKE_DIRECTORY_CREATE_LOCKED_EDGE = 4, MAKE_DIRECTORY_UNLOCK_EDGE = 5, MAKE_DIRECTORY_ROLLBACK = 6, }; // Steps: // // 1. Lookup if an existing directory exists. If it does, immediately succeed. // 2. Allocate inode id here in the CDC // 3. Create directory in shard we get from the inode // 4. Lookup old creation time for the edge we're about to create // 5. Create locked edge from owner to newly created directory. If this fail because of bad creation time, go back to 4 // 6. Unlock the edge created in 3 // // If 4 or 5 fails, 3 must be rolled back. 6 does not fail. // // 1 is necessary rather than failing on attempted override because otherwise failures // due to repeated calls are indistinguishable from genuine failures. struct MakeDirectoryStateMachine { StateMachineEnv& env; const MakeDirectoryReq& req; MakeDirectoryState state; MakeDirectoryStateMachine(StateMachineEnv& env_, const MakeDirectoryReq& req_, MakeDirectoryState state_): env(env_), req(req_), state(state_) {} void resume(const ShardRespContainer* resp) { if (env.txnStep == TXN_START) { start(); return; } if (unlikely(resp == nullptr)) { // we're resuming with no response switch (env.txnStep) { case MAKE_DIRECTORY_LOOKUP: lookup(); break; case MAKE_DIRECTORY_CREATE_DIR: createDirectoryInode(); break; case MAKE_DIRECTORY_LOOKUP_OLD_CREATION_TIME: lookupOldCreationTime(); break; case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: createLockedEdge(); break; case MAKE_DIRECTORY_UNLOCK_EDGE: unlockEdge(); break; case MAKE_DIRECTORY_ROLLBACK: rollback(); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } else { switch (env.txnStep) { case MAKE_DIRECTORY_LOOKUP: afterLookup(*resp); break; case MAKE_DIRECTORY_CREATE_DIR: afterCreateDirectoryInode(*resp); break; case MAKE_DIRECTORY_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(*resp); break; case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: afterCreateLockedEdge(*resp); break; case MAKE_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(*resp); break; case MAKE_DIRECTORY_ROLLBACK: afterRollback(*resp); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } } void start() { state.setDirId(env.nextDirectoryId(env.dbTxn)); lookup(); } void lookup(bool repeated = false) { auto& shardReq = env.needsShard(MAKE_DIRECTORY_LOOKUP, req.ownerId.shard(), repeated).setLookup(); shardReq.dirId = req.ownerId; shardReq.name = req.name; } void afterLookup(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { lookup(true); // retry } else if (err == TernError::DIRECTORY_NOT_FOUND) { env.finishWithError(err); } else if (err == TernError::NAME_NOT_FOUND) { // normal case, let's proceed createDirectoryInode(); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); const auto& lookupResp = resp.getLookup(); if (lookupResp.targetId.type() == InodeType::DIRECTORY) { // we're good already auto& cdcResp = env.finish().setMakeDirectory(); cdcResp.creationTime = lookupResp.creationTime; cdcResp.id = lookupResp.targetId; } else { env.finishWithError(TernError::CANNOT_OVERRIDE_NAME); } } } void createDirectoryInode(bool repeated = false) { auto& shardReq = env.needsShard(MAKE_DIRECTORY_CREATE_DIR, state.dirId().shard(), repeated).setCreateDirectoryInode(); shardReq.id = state.dirId(); shardReq.ownerId = req.ownerId; } void afterCreateDirectoryInode(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { // Try again -- note that the call to create directory inode is idempotent. createDirectoryInode(true); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); lookupOldCreationTime(); } } void lookupOldCreationTime(bool repeated = false) { auto& shardReq = env.needsShard(MAKE_DIRECTORY_LOOKUP_OLD_CREATION_TIME, req.ownerId.shard(), repeated).setFullReadDir(); shardReq.dirId = req.ownerId; shardReq.flags = FULL_READ_DIR_BACKWARDS | FULL_READ_DIR_SAME_NAME | FULL_READ_DIR_CURRENT; shardReq.limit = 1; shardReq.startName = req.name; shardReq.startTime = 0; // we have current set } void afterLookupOldCreationTime(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { lookupOldCreationTime(true); // retry } else if (err == TernError::DIRECTORY_NOT_FOUND) { // the directory we looked into doesn't even exist anymore -- // we've failed hard and we need to remove the inode. state.setExitError(err); rollback(); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); // there might be no existing edge const auto& fullReadDir = resp.getFullReadDir(); ALWAYS_ASSERT(fullReadDir.results.els.size() < 2); // we have limit=1 if (fullReadDir.results.els.size() == 0) { state.setOldCreationTime(0); // there was nothing present for this name } else { state.setOldCreationTime(fullReadDir.results.els[0].creationTime); } // keep going createLockedEdge(); } } void createLockedEdge(bool repeated = false) { auto& shardReq = env.needsShard(MAKE_DIRECTORY_CREATE_LOCKED_EDGE, req.ownerId.shard(), repeated).setCreateLockedCurrentEdge(); shardReq.dirId = req.ownerId; shardReq.targetId = state.dirId(); shardReq.name = req.name; shardReq.oldCreationTime = state.oldCreationTime(); } void afterCreateLockedEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (createCurrentLockedEdgeRetry(err)) { createLockedEdge(true); // try again } else if (err == TernError::CANNOT_OVERRIDE_NAME) { // this happens when a file gets created between when we looked // up whether there was something else and now. state.setExitError(err); rollback(); } else if (err == TernError::MISMATCHING_CREATION_TIME) { // lookup the old creation time again lookupOldCreationTime(); } else { // We know we cannot get directory not found because we managed to lookup // old creation time. // // We also cannot get MISMATCHING_TARGET since we are the only one // creating locked edges, and transactions execute serially. ALWAYS_ASSERT(err == TernError::NO_ERROR); state.setCreationTime(resp.getCreateLockedCurrentEdge().creationTime); unlockEdge(); } } void unlockEdge(bool repeated = false) { auto& shardReq = env.needsShard(MAKE_DIRECTORY_UNLOCK_EDGE, req.ownerId.shard(), repeated).setUnlockCurrentEdge(); shardReq.dirId = req.ownerId; shardReq.name = req.name; shardReq.targetId = state.dirId(); shardReq.wasMoved = false; shardReq.creationTime = state.creationTime(); } void afterUnlockEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT || err == TernError::MTIME_IS_TOO_RECENT) { // retry unlockEdge(true); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); // We're done, record the parent relationship and finish { auto k = InodeIdKey::Static(state.dirId()); auto v = InodeIdValue::Static(req.ownerId); ROCKS_DB_CHECKED(env.dbTxn.Put(env.parentCf, k.toSlice(), v.toSlice())); } auto& resp = env.finish().setMakeDirectory(); resp.id = state.dirId(); resp.creationTime = state.creationTime(); } } void rollback(bool repeated = false) { // disown the child, it'll be collected by GC. auto& shardReq = env.needsShard(MAKE_DIRECTORY_ROLLBACK, state.dirId().shard(), repeated).setRemoveDirectoryOwner(); shardReq.dirId = state.dirId(); // we've just created this directory, it is empty, therefore the policy // is irrelevant. shardReq.info = defaultDirectoryInfo(); } void afterRollback(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { rollback(true); // retry } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); env.finishWithError(state.exitError()); } } }; enum HardUnlinkDirectoryStep : uint8_t { HARD_UNLINK_DIRECTORY_REMOVE_INODE = 1, }; // The only reason we have this here is for possible conflicts with RemoveDirectoryOwner, // which might temporarily set the owner of a directory to NULL. Since in the current // implementation we only ever have one transaction in flight in the CDC, we can just // execute this. struct HardUnlinkDirectoryStateMachine { StateMachineEnv& env; const HardUnlinkDirectoryReq& req; HardUnlinkDirectoryState state; HardUnlinkDirectoryStateMachine(StateMachineEnv& env_, const HardUnlinkDirectoryReq& req_, HardUnlinkDirectoryState state_): env(env_), req(req_), state(state_) {} void resume(const ShardRespContainer* resp) { if (env.txnStep == TXN_START) { removeInode(); return; } if (unlikely(resp == nullptr)) { // we're resuming with no response switch (env.txnStep) { case HARD_UNLINK_DIRECTORY_REMOVE_INODE: removeInode(); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } else { switch (env.txnStep) { case HARD_UNLINK_DIRECTORY_REMOVE_INODE: afterRemoveInode(*resp); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } } void removeInode(bool repeated = false) { auto& shardReq = env.needsShard(HARD_UNLINK_DIRECTORY_REMOVE_INODE, req.dirId.shard(), repeated).setRemoveInode(); shardReq.id = req.dirId; } void afterRemoveInode(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { removeInode(true); // try again } else if ( err == TernError::DIRECTORY_NOT_FOUND || err == TernError::DIRECTORY_HAS_OWNER || err == TernError::DIRECTORY_NOT_EMPTY ) { env.finishWithError(err); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); env.finish().setHardUnlinkDirectory(); } } }; enum RenameFileStep : uint8_t { RENAME_FILE_LOCK_OLD_EDGE = 1, RENAME_FILE_LOOKUP_OLD_CREATION_TIME = 2, RENAME_FILE_CREATE_NEW_LOCKED_EDGE = 3, RENAME_FILE_UNLOCK_NEW_EDGE = 4, RENAME_FILE_UNLOCK_OLD_EDGE = 5, RENAME_FILE_ROLLBACK = 6, }; // Steps: // // 1. lock source current edge // 2. lookup prev creation time for current target edge // 3. create destination locked current target edge. if it fails because of bad creation time, go back to 2 // 4. unlock edge in step 3 // 5. unlock source target current edge, and soft unlink it // // If we fail at step 2 or 3, we need to roll back step 1. Steps 3 and 4 should never fail. struct RenameFileStateMachine { StateMachineEnv& env; const RenameFileReq& req; RenameFileState state; RenameFileStateMachine(StateMachineEnv& env_, const RenameFileReq& req_, RenameFileState state_): env(env_), req(req_), state(state_) {} void resume(const ShardRespContainer* resp) { if (env.txnStep == TXN_START) { start(); return; } if (unlikely(resp == nullptr)) { // we're resuming with no response switch (env.txnStep) { case RENAME_FILE_LOCK_OLD_EDGE: lockOldEdge(); break; case RENAME_FILE_LOOKUP_OLD_CREATION_TIME: lookupOldCreationTime(); break; case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: createNewLockedEdge(); break; case RENAME_FILE_UNLOCK_NEW_EDGE: unlockNewEdge(); break; case RENAME_FILE_UNLOCK_OLD_EDGE: unlockOldEdge(); break; case RENAME_FILE_ROLLBACK: rollback(); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } else { switch (env.txnStep) { case RENAME_FILE_LOCK_OLD_EDGE: afterLockOldEdge(*resp); break; case RENAME_FILE_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(*resp); break; case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: afterCreateNewLockedEdge(*resp); break; case RENAME_FILE_UNLOCK_NEW_EDGE: afterUnlockNewEdge(*resp); break; case RENAME_FILE_UNLOCK_OLD_EDGE: afterUnlockOldEdge(*resp); break; case RENAME_FILE_ROLLBACK: afterRollback(*resp); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } } void start() { // We need this explicit check here because moving directories is more complicated, // and therefore we do it in another transaction type entirely. if (req.targetId.type() == InodeType::DIRECTORY) { env.finishWithError(TernError::TYPE_IS_NOT_DIRECTORY); } else if (req.oldOwnerId == req.newOwnerId) { env.finishWithError(TernError::SAME_DIRECTORIES); } else { lockOldEdge(); } } void lockOldEdge(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_FILE_LOCK_OLD_EDGE, req.oldOwnerId.shard(), repeated).setLockCurrentEdge(); shardReq.dirId = req.oldOwnerId; shardReq.name = req.oldName; shardReq.targetId = req.targetId; shardReq.creationTime = req.oldCreationTime; } void afterLockOldEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { lockOldEdge(true); // retry } else if ( err == TernError::EDGE_NOT_FOUND || err == TernError::MISMATCHING_CREATION_TIME || err == TernError::DIRECTORY_NOT_FOUND ) { // We failed hard and we have nothing to roll back if (err == TernError::DIRECTORY_NOT_FOUND) { err = TernError::OLD_DIRECTORY_NOT_FOUND; } env.finishWithError(err); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); lookupOldCreationTime(); } } void lookupOldCreationTime(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_FILE_LOOKUP_OLD_CREATION_TIME, req.newOwnerId.shard(), repeated).setFullReadDir(); shardReq.dirId = req.newOwnerId; shardReq.flags = FULL_READ_DIR_BACKWARDS | FULL_READ_DIR_SAME_NAME | FULL_READ_DIR_CURRENT; shardReq.limit = 1; shardReq.startName = req.newName; shardReq.startTime = 0; // we have current set } void afterLookupOldCreationTime(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { lookupOldCreationTime(true); // retry } else if (err == TernError::DIRECTORY_NOT_FOUND) { // we've failed hard and we need to unlock the old edge. err = TernError::NEW_DIRECTORY_NOT_FOUND; state.setExitError(err); rollback(); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); // there might be no existing edge const auto& fullReadDir = resp.getFullReadDir(); ALWAYS_ASSERT(fullReadDir.results.els.size() < 2); // we have limit=1 if (fullReadDir.results.els.size() == 0) { state.setNewOldCreationTime(0); // there was nothing present for this name } else { state.setNewOldCreationTime(fullReadDir.results.els[0].creationTime); } // keep going createNewLockedEdge(); } } void createNewLockedEdge(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_FILE_CREATE_NEW_LOCKED_EDGE, req.newOwnerId.shard(), repeated).setCreateLockedCurrentEdge(); shardReq.dirId = req.newOwnerId; shardReq.name = req.newName; shardReq.targetId = req.targetId; shardReq.oldCreationTime = state.newOldCreationTime(); } void afterCreateNewLockedEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (createCurrentLockedEdgeRetry(err)) { createNewLockedEdge(true); // retry } else if (err == TernError::MISMATCHING_CREATION_TIME) { // we need to lookup the creation time again. lookupOldCreationTime(); } else if (err == TernError::CANNOT_OVERRIDE_NAME) { // we failed hard and we need to rollback state.setExitError(err); rollback(); } else { state.setNewCreationTime(resp.getCreateLockedCurrentEdge().creationTime); unlockNewEdge(); } } void unlockNewEdge(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_FILE_UNLOCK_NEW_EDGE, req.newOwnerId.shard(), repeated).setUnlockCurrentEdge(); shardReq.dirId = req.newOwnerId; shardReq.targetId = req.targetId; shardReq.name = req.newName; shardReq.wasMoved = false; shardReq.creationTime = state.newCreationTime(); } void afterUnlockNewEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { unlockNewEdge(true); // retry } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); unlockOldEdge(); } } void unlockOldEdge(bool repeated = false) { // We're done creating the destination edge, now unlock the source, marking it as moved auto& shardReq = env.needsShard(RENAME_FILE_UNLOCK_OLD_EDGE, req.oldOwnerId.shard(), repeated).setUnlockCurrentEdge(); shardReq.dirId = req.oldOwnerId; shardReq.targetId = req.targetId; shardReq.name = req.oldName; shardReq.wasMoved = true; shardReq.creationTime = req.oldCreationTime; } void afterUnlockOldEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { unlockOldEdge(true); // retry } else { // This can only be because of repeated calls from here: we have the edge locked, // and only the CDC does changes. // TODO it would be cleaner to verify this with a lookup ALWAYS_ASSERT(err == TernError::NO_ERROR || err == TernError::EDGE_NOT_FOUND); // we're finally done auto& resp = env.finish().setRenameFile(); resp.creationTime = state.newCreationTime(); } } void rollback(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_FILE_ROLLBACK, req.oldOwnerId.shard(), repeated).setUnlockCurrentEdge(); shardReq.dirId = req.oldOwnerId; shardReq.name = req.oldName; shardReq.targetId = req.targetId; shardReq.wasMoved = false; shardReq.creationTime = state.newCreationTime(); } void afterRollback(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { rollback(true); // retry } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); env.finishWithError(state.exitError()); } } }; enum SoftUnlinkDirectoryStep : uint8_t { SOFT_UNLINK_DIRECTORY_LOCK_EDGE = 1, SOFT_UNLINK_DIRECTORY_STAT = 2, SOFT_UNLINK_DIRECTORY_REMOVE_OWNER = 3, SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE = 4, SOFT_UNLINK_DIRECTORY_ROLLBACK = 5, }; // Steps: // // 1. Lock edge going into the directory to remove. This prevents things making // making it snapshot or similar in the meantime. // 2. Resolve the directory info, since we'll need to store it when we remove the directory owner. // 3. Remove directory owner from directory that we want to remove. This will fail if there still // are current edges there. // 4. Unlock edge going into the directory, making it snapshot. // // If 2 or 3 fail, we need to roll back the locking, without making the edge snapshot. struct SoftUnlinkDirectoryStateMachine { StateMachineEnv& env; const SoftUnlinkDirectoryReq& req; SoftUnlinkDirectoryState state; DirectoryInfo info; SoftUnlinkDirectoryStateMachine(StateMachineEnv& env_, const SoftUnlinkDirectoryReq& req_, SoftUnlinkDirectoryState state_): env(env_), req(req_), state(state_) {} void resume(const ShardRespContainer* resp) { if (env.txnStep == TXN_START) { start(); return; } if (unlikely(resp == nullptr)) { // we're resuming with no response switch (env.txnStep) { case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: lockEdge(); break; case SOFT_UNLINK_DIRECTORY_STAT: stat(); break; // We don't persist the directory info, so we need to restart the stating when resuming case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: stat(); break; case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: unlockEdge(); break; case SOFT_UNLINK_DIRECTORY_ROLLBACK: rollback(); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } else { switch (env.txnStep) { case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: afterLockEdge(*resp); break; case SOFT_UNLINK_DIRECTORY_STAT: afterStat(*resp); break; case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: afterRemoveOwner(*resp); break; case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(*resp); break; case SOFT_UNLINK_DIRECTORY_ROLLBACK: afterRollback(*resp); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } } void start() { if (req.targetId.type() != InodeType::DIRECTORY) { env.finishWithError(TernError::TYPE_IS_NOT_DIRECTORY); } else { lockEdge(); } } void lockEdge(bool repeated = false) { auto& shardReq = env.needsShard(SOFT_UNLINK_DIRECTORY_LOCK_EDGE, req.ownerId.shard(), repeated).setLockCurrentEdge(); shardReq.dirId = req.ownerId; shardReq.name = req.name; shardReq.targetId = req.targetId; shardReq.creationTime = req.creationTime; } void afterLockEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { lockEdge(true); } else if (err == TernError::MISMATCHING_CREATION_TIME || err == TernError::EDGE_NOT_FOUND || err == TernError::DIRECTORY_NOT_FOUND) { LOG_INFO(env.env, "failed locking edge in soft unlink for req: %s with err: %s", req, err); env.finishWithError(err); // no rollback to be done } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); state.setStatDirId(req.targetId); stat(); } } void stat(bool repeated = false) { auto& shardReq = env.needsShard(SOFT_UNLINK_DIRECTORY_STAT, state.statDirId().shard(), repeated).setStatDirectory(); shardReq.id = state.statDirId(); } void afterStat(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { stat(true); // retry } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); const auto& statResp = resp.getStatDirectory(); // insert tags for (const auto& newEntry : statResp.info.entries.els) { bool found = false; for (const auto& entry: info.entries.els) { if (entry.tag == newEntry.tag) { found = true; break; } } if (!found) { info.entries.els.emplace_back(newEntry); } } if (info.entries.els.size() == REQUIRED_DIR_INFO_TAGS.size()) { // we've found everything auto& shardReq = env.needsShard(SOFT_UNLINK_DIRECTORY_REMOVE_OWNER, req.targetId.shard(), false).setRemoveDirectoryOwner(); shardReq.dirId = req.targetId; shardReq.info = info; } else { ALWAYS_ASSERT(statResp.owner != NULL_INODE_ID); // keep walking upwards state.setStatDirId(statResp.owner); stat(); } } } void afterRemoveOwner(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { // we don't want to keep the dir info around start again from the last stat stat(); } else if (err == TernError::DIRECTORY_NOT_EMPTY) { state.setExitError(err); rollback(); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR, "Unexpected error when removing owner, ownerId=%s name=%s creationTime=%s targetId=%s: %s", req.ownerId, GoLangQuotedStringFmt(req.name.ref().data(), req.name.ref().size()), req.creationTime, req.targetId, err); unlockEdge(); } } void unlockEdge(bool repeated = false) { auto& shardReq = env.needsShard(SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE, req.ownerId.shard(), repeated).setUnlockCurrentEdge(); shardReq.dirId = req.ownerId; shardReq.name = req.name; shardReq.targetId = req.targetId; // Note that here we used wasMoved even if the subsequent // snapshot edge will be non-owned, since we're dealing with // a directory, rather than a file. shardReq.wasMoved = true; shardReq.creationTime = req.creationTime; } void afterUnlockEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { unlockEdge(true); } else { // This can only be because of repeated calls from here: we have the edge locked, // and only the CDC does changes. // TODO it would be cleaner to verify this with a lookup ALWAYS_ASSERT(err == TernError::NO_ERROR || err == TernError::EDGE_NOT_FOUND); auto& cdcResp = env.finish().setSoftUnlinkDirectory(); // Update parent map { auto k = InodeIdKey::Static(req.targetId); ROCKS_DB_CHECKED(env.dbTxn.Delete(env.parentCf, k.toSlice())); } } } void rollback(bool repeated = false) { auto& shardReq = env.needsShard(SOFT_UNLINK_DIRECTORY_ROLLBACK, req.ownerId.shard(), repeated).setUnlockCurrentEdge(); shardReq.dirId = req.ownerId; shardReq.name = req.name; shardReq.targetId = req.targetId; shardReq.wasMoved = false; shardReq.creationTime = req.creationTime; } void afterRollback(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { rollback(true); } else { // This can only be because of repeated calls from here: we have the edge locked, // and only the CDC does changes. // TODO it would be cleaner to verify this with a lookup ALWAYS_ASSERT(err == TernError::NO_ERROR || err == TernError::EDGE_NOT_FOUND); env.finishWithError(state.exitError()); } } }; enum RenameDirectoryStep : uint8_t { RENAME_DIRECTORY_LOCK_OLD_EDGE = 1, RENAME_DIRECTORY_LOOKUP_OLD_CREATION_TIME = 2, RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE = 3, RENAME_DIRECTORY_UNLOCK_NEW_EDGE = 4, RENAME_DIRECTORY_UNLOCK_OLD_EDGE = 5, RENAME_DIRECTORY_SET_OWNER = 6, RENAME_DIRECTORY_ROLLBACK = 7, }; // Steps: // // 1. Make sure there's no loop by traversing the parents // 2. Lock old edge // 3. Lookup old creation time for the edge // 4. Create and lock the new edge // 5. Unlock the new edge // 6. Unlock and unlink the old edge // 7. Update the owner of the moved directory to the new directory // // If we fail at step 3 or 4, we need to unlock the edge we locked at step 2. Step 5 and 6 // should never fail. struct RenameDirectoryStateMachine { StateMachineEnv& env; const RenameDirectoryReq& req; RenameDirectoryState state; RenameDirectoryStateMachine(StateMachineEnv& env_, const RenameDirectoryReq& req_, RenameDirectoryState state_): env(env_), req(req_), state(state_) {} void resume(const ShardRespContainer* resp) { if (env.txnStep == TXN_START) { start(); return; } if (unlikely(resp == nullptr)) { // we're resuming with no response switch (env.txnStep) { case RENAME_DIRECTORY_LOCK_OLD_EDGE: lockOldEdge(); break; case RENAME_DIRECTORY_LOOKUP_OLD_CREATION_TIME: lookupOldCreationTime(); break; case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: createLockedNewEdge(); break; case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: unlockNewEdge(); break; case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: unlockOldEdge(); break; case RENAME_DIRECTORY_SET_OWNER: setOwner(); break; case RENAME_DIRECTORY_ROLLBACK: rollback(); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } else { switch (env.txnStep) { case RENAME_DIRECTORY_LOCK_OLD_EDGE: afterLockOldEdge(*resp); break; case RENAME_DIRECTORY_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(*resp); break; case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: afterCreateLockedEdge(*resp); break; case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: afterUnlockNewEdge(*resp); break; case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: afterUnlockOldEdge(*resp); break; case RENAME_DIRECTORY_SET_OWNER: afterSetOwner(*resp); break; case RENAME_DIRECTORY_ROLLBACK: afterRollback(*resp); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } } // Check that changing this parent-child relationship wouldn't create // loops in directory structure. bool loopCheck() { std::unordered_set visited; InodeId cursor = req.targetId; for (;;) { if (visited.count(cursor) > 0) { LOG_INFO(env.env, "Re-encountered %s in loop check, will return false", cursor); return false; } LOG_DEBUG(env.env, "Performing loop check for %s", cursor); visited.insert(cursor); if (cursor == req.targetId) { cursor = req.newOwnerId; } else { auto k = InodeIdKey::Static(cursor); std::string v; ROCKS_DB_CHECKED(env.dbTxn.Get({}, env.parentCf, k.toSlice(), &v)); cursor = ExternalValue(v)().id(); } if (cursor == ROOT_DIR_INODE_ID) { break; } } return true; } void start() { if (req.targetId.type() != InodeType::DIRECTORY) { env.finishWithError(TernError::TYPE_IS_NOT_DIRECTORY); } else if (req.oldOwnerId == req.newOwnerId) { env.finishWithError(TernError::SAME_DIRECTORIES); } else if (!loopCheck()) { // First, check if we'd create a loop env.finishWithError(TernError::LOOP_IN_DIRECTORY_RENAME); } else { // Now, actually start by locking the old edge lockOldEdge(); } } void lockOldEdge(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_DIRECTORY_LOCK_OLD_EDGE, req.oldOwnerId.shard(), repeated).setLockCurrentEdge(); shardReq.dirId = req.oldOwnerId; shardReq.name = req.oldName; shardReq.targetId = req.targetId; shardReq.creationTime = req.oldCreationTime; } void afterLockOldEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { lockOldEdge(true); // retry } else if ( err == TernError::DIRECTORY_NOT_FOUND || err == TernError::EDGE_NOT_FOUND || err == TernError::MISMATCHING_CREATION_TIME ) { if (err == TernError::DIRECTORY_NOT_FOUND) { err = TernError::OLD_DIRECTORY_NOT_FOUND; } env.finishWithError(err); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); lookupOldCreationTime(); } } void lookupOldCreationTime(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_FILE_LOOKUP_OLD_CREATION_TIME, req.newOwnerId.shard(), repeated).setFullReadDir(); shardReq.dirId = req.newOwnerId; shardReq.flags = FULL_READ_DIR_BACKWARDS | FULL_READ_DIR_SAME_NAME | FULL_READ_DIR_CURRENT; shardReq.limit = 1; shardReq.startName = req.newName; shardReq.startTime = 0; // we have current set } void afterLookupOldCreationTime(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { lookupOldCreationTime(true); // retry } else if (err == TernError::DIRECTORY_NOT_FOUND) { // we've failed hard and we need to unlock the old edge. state.setExitError(err); rollback(); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); // there might be no existing edge const auto& fullReadDir = resp.getFullReadDir(); ALWAYS_ASSERT(fullReadDir.results.els.size() < 2); // we have limit=1 if (fullReadDir.results.els.size() == 0) { state.setNewOldCreationTime(0); // there was nothing present for this name } else { state.setNewOldCreationTime(fullReadDir.results.els[0].creationTime); } // keep going createLockedNewEdge(); } } void createLockedNewEdge(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE, req.newOwnerId.shard(), repeated).setCreateLockedCurrentEdge(); shardReq.dirId = req.newOwnerId; shardReq.name = req.newName; shardReq.targetId = req.targetId; shardReq.oldCreationTime = state.newOldCreationTime(); } void afterCreateLockedEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (createCurrentLockedEdgeRetry(err)) { createLockedNewEdge(true); } else if (err == TernError::MISMATCHING_CREATION_TIME) { // we need to lookup the creation time again. lookupOldCreationTime(); } else if (err == TernError::CANNOT_OVERRIDE_NAME) { state.setExitError(err); rollback(); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); state.setNewCreationTime(resp.getCreateLockedCurrentEdge().creationTime); unlockNewEdge(); } } void unlockNewEdge(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_DIRECTORY_UNLOCK_NEW_EDGE, req.newOwnerId.shard(), repeated).setUnlockCurrentEdge(); shardReq.dirId = req.newOwnerId; shardReq.name = req.newName; shardReq.targetId = req.targetId; shardReq.wasMoved = false; shardReq.creationTime = state.newCreationTime(); } void afterUnlockNewEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { unlockNewEdge(true); } else if (err == TernError::EDGE_NOT_FOUND) { // This can only be because of repeated calls from here: we have the edge locked, // and only the CDC does changes. // TODO it would be cleaner to verify this with a lookup unlockOldEdge(); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); unlockOldEdge(); } } void unlockOldEdge(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_DIRECTORY_UNLOCK_OLD_EDGE, req.oldOwnerId.shard(), repeated).setUnlockCurrentEdge(); shardReq.dirId = req.oldOwnerId; shardReq.name = req.oldName; shardReq.targetId = req.targetId; shardReq.wasMoved = true; shardReq.creationTime = req.oldCreationTime; } void afterUnlockOldEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { unlockOldEdge(true); } else if (err == TernError::EDGE_NOT_FOUND) { // This can only be because of repeated calls from here: we have the edge locked, // and only the CDC does changes. // TODO it would be cleaner to verify this with a lookup setOwner(); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); setOwner(); } } void setOwner(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_DIRECTORY_SET_OWNER, req.targetId.shard(), repeated).setSetDirectoryOwner(); shardReq.ownerId = req.newOwnerId; shardReq.dirId = req.targetId; } void afterSetOwner(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { setOwner(true); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); auto& resp = env.finish().setRenameDirectory(); resp.creationTime = state.newCreationTime(); // update cache { auto k = InodeIdKey::Static(req.targetId); auto v = InodeIdValue::Static(req.newOwnerId); ROCKS_DB_CHECKED(env.dbTxn.Put(env.parentCf, k.toSlice(), v.toSlice())); } } } void rollback(bool repeated = false) { auto& shardReq = env.needsShard(RENAME_DIRECTORY_ROLLBACK, req.oldOwnerId.shard(), repeated).setUnlockCurrentEdge(); shardReq.dirId = req.oldOwnerId; shardReq.name = req.oldName; shardReq.targetId = req.targetId; shardReq.wasMoved = false; shardReq.creationTime = state.newCreationTime(); } void afterRollback(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { rollback(true); } else { env.finishWithError(state.exitError()); } } }; enum CrossShardHardUnlinkFileStep : uint8_t { CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE = 1, CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT = 2, }; // Steps: // // 1. Remove owning edge in one shard // 2. Make file transient in other shard // // Step 2 cannot fail. struct CrossShardHardUnlinkFileStateMachine { StateMachineEnv& env; const CrossShardHardUnlinkFileReq& req; CrossShardHardUnlinkFileState state; CrossShardHardUnlinkFileStateMachine(StateMachineEnv& env_, const CrossShardHardUnlinkFileReq& req_, CrossShardHardUnlinkFileState state_): env(env_), req(req_), state(state_) {} void resume(const ShardRespContainer* resp) { if (env.txnStep == TXN_START) { start(); return; } if (unlikely(resp == nullptr)) { // we're resuming with no response switch (env.txnStep) { case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: removeEdge(); break; case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: makeTransient(); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } else { switch (env.txnStep) { case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: afterRemoveEdge(*resp); break; case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: afterMakeTransient(*resp); break; default: throw TERN_EXCEPTION("bad step %s", env.txnStep); } } } void start() { if (req.ownerId.shard() == req.targetId.shard()) { env.finishWithError(TernError::SAME_SHARD); } else if (req.targetId.type() == InodeType::DIRECTORY) { env.finishWithError(TernError::TYPE_IS_DIRECTORY); } else { removeEdge(); } } void removeEdge(bool repeated = false) { auto& shardReq = env.needsShard(CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE, req.ownerId.shard(), repeated).setRemoveOwnedSnapshotFileEdge(); shardReq.ownerId = req.ownerId; shardReq.targetId = req.targetId; shardReq.name = req.name; shardReq.creationTime = req.creationTime; } void afterRemoveEdge(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT || err == TernError::MTIME_IS_TOO_RECENT) { removeEdge(true); } else if (err == TernError::DIRECTORY_NOT_FOUND) { env.finishWithError(err); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR); makeTransient(); } } void makeTransient(bool repeated = false) { auto& shardReq = env.needsShard(CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT, req.targetId.shard(), repeated).setMakeFileTransient(); shardReq.id = req.targetId; shardReq.note = req.name; } void afterMakeTransient(const ShardRespContainer& resp) { auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR; if (err == TernError::TIMEOUT) { makeTransient(true); } else { ALWAYS_ASSERT(err == TernError::NO_ERROR || err == TernError::FILE_NOT_FOUND); env.finish().setCrossShardHardUnlinkFile(); } } }; void CDCShardResp::pack(BincodeBuf& buf) const { buf.packScalar(txnId.x); checkPoint.pack(buf); resp.pack(buf); } void CDCShardResp::unpack(BincodeBuf& buf) { txnId.x = buf.unpackScalar(); checkPoint.unpack(buf); resp.unpack(buf); } size_t CDCShardResp::packedSize() const { return sizeof(uint64_t) + checkPoint.packedSize() + resp.packedSize(); } void CDCLogEntry::prepareLogEntries(std::vector& cdcReqs, std::vector& shardResps, size_t maxPackedSize, std::vector& entriesOut) { size_t usedSize = std::numeric_limits::max(); CDCLogEntry* curEntry{nullptr}; for (auto& shardResp : shardResps) { auto respSize = shardResp.packedSize(); ALWAYS_ASSERT(respSize < maxPackedSize); if (unlikely(maxPackedSize - respSize < usedSize)) { curEntry = &entriesOut.emplace_back(); usedSize = curEntry->packedSize(); } curEntry->_shardResps.emplace_back(std::move(shardResp)); usedSize += respSize; ALWAYS_ASSERT(usedSize <= maxPackedSize); } for (auto& cdcReq : cdcReqs) { auto reqSize = cdcReq.packedSize(); ALWAYS_ASSERT(reqSize < maxPackedSize); if (unlikely(maxPackedSize - reqSize < usedSize)) { curEntry = &entriesOut.emplace_back(); usedSize = curEntry->packedSize(); } curEntry->_cdcReqs.emplace_back(std::move(cdcReq)); usedSize += reqSize; ALWAYS_ASSERT(usedSize <= maxPackedSize); } cdcReqs.clear(); shardResps.clear(); } CDCLogEntry CDCLogEntry::prepareBootstrapEntry() { CDCLogEntry entry; entry._bootstrapEntry = true; return entry; } void CDCLogEntry::pack(BincodeBuf& buf) const { buf.packScalar(_bootstrapEntry); buf.packScalar(_cdcReqs.size()); for (auto& cdcReq : _cdcReqs) { cdcReq.pack(buf); } buf.packScalar(_shardResps.size()); for (auto& shardResp : _shardResps) { shardResp.pack(buf); } } void CDCLogEntry::unpack(BincodeBuf& buf) { _bootstrapEntry = buf.unpackScalar(); _cdcReqs.resize(buf.unpackScalar()); for (auto& cdcReq : _cdcReqs) { cdcReq.unpack(buf); } _shardResps.resize(buf.unpackScalar()); for (auto& shardResp : _shardResps) { shardResp.unpack(buf); } } size_t CDCLogEntry::packedSize() const { size_t size{1 + 2 * sizeof(uint32_t)}; for (auto& cdcReq : _cdcReqs) { size += cdcReq.packedSize(); } for (auto& shardResp : _shardResps) { size += shardResp.packedSize(); } return size; } struct CDCDBImpl { Env _env; // The reason why we insist in storing everything in RocksDB is that we can do // everything in a single transaction, so it's easier to reason about atomic // modifications. _dirsToTxnsCf for example would be much simpler as a // simple unordered_map. // // It also has the nice advantage that we don't need to reconstruct the state // when starting up, it's all already there. // The general pattern in this file is to use a txn for everything, // hence this naming. rocksdb::OptimisticTransactionDB* _dbDontUseDirectly; rocksdb::ColumnFamilyHandle* _defaultCf; rocksdb::ColumnFamilyHandle* _parentCf; rocksdb::ColumnFamilyHandle* _enqueuedCf; // V1, txnId -> CDC req, only for executing or waiting to be executed requests rocksdb::ColumnFamilyHandle* _executingCf; // V1, txnId -> CDC state machine, for requests that are executing // V1, data structure storing a dir to txn ids mapping: // InodeId -> txnId -- sentinel telling us what the first txn in line is. If none, zero. // we need the sentinel to skip over tombstones quickly. // InodeId, txnId set with the queue rocksdb::ColumnFamilyHandle* _dirsToTxnsCf; // legacy rocksdb::ColumnFamilyHandle* _reqQueueCfLegacy; // V0, txnId -> CDC req, for all the requests (including historical) AssertiveLock _processLock; std::shared_ptr _dbStatistics; std::string _dbStatisticsFile; // ---------------------------------------------------------------- // initialization CDCDBImpl() = delete; CDCDBImpl& operator=(const CDCDBImpl&) = delete; CDCDBImpl(Logger& logger, std::shared_ptr& xmon, SharedRocksDB& sharedDb) : _env(logger, xmon, "cdc_db") { _defaultCf = sharedDb.getCF(rocksdb::kDefaultColumnFamilyName); _reqQueueCfLegacy = sharedDb.getCF("reqQueue"); _parentCf = sharedDb.getCF("parent"); _enqueuedCf = sharedDb.getCF("enqueued"); _executingCf = sharedDb.getCF("executing"); _dirsToTxnsCf = sharedDb.getCF("dirsToTxns"); _dbDontUseDirectly = sharedDb.transactionDB(); _initDb(); } // Getting/setting txn ids from our txn ids keys #define TXN_ID_SETTER_GETTER(key, getterName, setterName) \ uint64_t getterName(rocksdb::Transaction& dbTxn) { \ std::string v; \ ROCKS_DB_CHECKED(dbTxn.Get({}, _defaultCf, cdcMetadataKey(&key), &v)); \ ExternalValue txnIdV(v); \ return txnIdV().u64(); \ } \ void setterName(rocksdb::Transaction& dbTxn, uint64_t x) { \ auto v = U64Value::Static(x); \ ROCKS_DB_CHECKED(dbTxn.Put(_defaultCf, cdcMetadataKey(&key), v.toSlice())); \ } TXN_ID_SETTER_GETTER(LAST_TXN_KEY, _lastTxn, _setLastTxn) TXN_ID_SETTER_GETTER(EXECUTING_TXN_KEY, _executingTxnLegacy, _setExecutingTxnLegacy) #undef TXN_ID_SETTER_GETTER // returns -1 if the version key was not set int64_t _version(rocksdb::Transaction& dbTxn) { std::string value; auto status = dbTxn.Get({}, _defaultCf, cdcMetadataKey(&VERSION_KEY), &value); if (status.IsNotFound()) { return -1; } else { ROCKS_DB_CHECKED(status); ExternalValue vV(value); return vV().u64(); } } void _setVersion(rocksdb::Transaction& dbTxn, uint64_t version) { auto v = U64Value::Static(version); ROCKS_DB_CHECKED(dbTxn.Put(_defaultCf, cdcMetadataKey(&VERSION_KEY), v.toSlice())); } void _initDbV0(rocksdb::Transaction& dbTxn) { LOG_INFO(_env, "initializing V0 db"); const auto keyExists = [&dbTxn](rocksdb::ColumnFamilyHandle* cf, const rocksdb::Slice& key) -> bool { std::string value; auto status = dbTxn.Get({}, cf, key, &value); if (status.IsNotFound()) { return false; } else { ROCKS_DB_CHECKED(status); return true; } }; if (!keyExists(_defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY))) { LOG_INFO(_env, "initializing next directory id"); auto id = InodeIdValue::Static(InodeId::FromU64(ROOT_DIR_INODE_ID.u64 + 1)); ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), id.toSlice())); } const auto initZeroValue = [this, &keyExists, &dbTxn](const std::string& what, const CDCMetadataKey& key) { if (!keyExists(_defaultCf, cdcMetadataKey(&key))) { LOG_INFO(_env, "initializing %s", what); StaticValue v; v().setU64(0); ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&key), v.toSlice())); } }; initZeroValue("last txn", LAST_TXN_KEY); initZeroValue("first txn in queue", FIRST_TXN_IN_QUEUE_KEY); initZeroValue("last txn in queue", LAST_TXN_IN_QUEUE_KEY); initZeroValue("executing txn", EXECUTING_TXN_KEY); initZeroValue("last applied log index", LAST_APPLIED_LOG_ENTRY_KEY); } void _initDbV1(rocksdb::Transaction& dbTxn) { LOG_INFO(_env, "initializing V1 db"); // Pick up the executing txn, if any, and move it to the executing CF. // We preserve the executing txn to preserve integrity. { uint64_t executingTxn = _executingTxnLegacy(dbTxn); if (executingTxn != 0) { LOG_INFO(_env, "migrating txn %s", executingTxn); // _reqQueueCf -> _enqueuedCf auto txnK = CDCTxnIdKey::Static(CDCTxnId(executingTxn)); std::string reqV; ROCKS_DB_CHECKED(dbTxn.Get({}, _reqQueueCfLegacy, txnK.toSlice(), &reqV)); CDCReqContainer req; bincodeFromRocksValue(reqV, req); ROCKS_DB_CHECKED(dbTxn.Put(_enqueuedCf, txnK.toSlice(), reqV)); // EXECUTING_TXN_KEY -> _executingCf std::string txnStateV; ROCKS_DB_CHECKED(dbTxn.Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV)); ROCKS_DB_CHECKED(dbTxn.Put(_executingCf, txnK.toSlice(), txnStateV)); // Add to _dirsToTxnsCf, will lock since things are empty _addToDirsToTxns(dbTxn, txnK().id(), req); } } // Throw away everything legacy. The clients will just retry. ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&FIRST_TXN_IN_QUEUE_KEY))); ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&LAST_TXN_IN_QUEUE_KEY))); ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&EXECUTING_TXN_KEY))); ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&EXECUTING_TXN_STATE_KEY))); // We delete the _reqQueueCfLegacy CF outside the transaction, because it's just too expensive // to delete here one-by-one. } void _initDb() { rocksdb::WriteOptions options; options.sync = true; std::unique_ptr dbTxn(_dbDontUseDirectly->BeginTransaction(options)); if (_version(*dbTxn) == -1) { _initDbV0(*dbTxn); _setVersion(*dbTxn, 0); } if (_version(*dbTxn) == 0) { _initDbV1(*dbTxn); _setVersion(*dbTxn, 1); } commitTransaction(*dbTxn); // This means that it'll be recreated and dropped each time, but that's OK. _dbDontUseDirectly->DropColumnFamily(_reqQueueCfLegacy); LOG_INFO(_env, "DB initialization done"); } // ---------------------------------------------------------------- // retrying txns void commitTransaction(rocksdb::Transaction& txn) { XmonNCAlert alert(10_sec); for (;;) { auto status = txn.Commit(); if (likely(status.ok())) { _env.clearAlert(alert); return; } if (likely(status.IsTryAgain())) { _env.updateAlert(alert, "got try again in CDC transaction, will sleep for a second and try again"); (1_sec).sleepRetry(); continue; } // We don't expect any other kind of error. The docs state: // // If this transaction was created by an OptimisticTransactionDB(), // Status::Busy() may be returned if the transaction could not guarantee // that there are no write conflicts. Status::TryAgain() may be returned // if the memtable history size is not large enough // (See max_write_buffer_size_to_maintain). // // However we never run transactions concurrently. So we should never get busy. // // This is just a way to throw the right exception. ROCKS_DB_CHECKED(status); } } // Processing // ---------------------------------------------------------------- uint64_t _lastAppliedLogEntryDB() { std::string value; ROCKS_DB_CHECKED(_dbDontUseDirectly->Get({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), &value)); ExternalValue v(value); return v().u64(); } uint64_t _lastAppliedLogEntry(rocksdb::Transaction& dbTxn) { std::string value; ROCKS_DB_CHECKED(dbTxn.Get({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), &value)); ExternalValue v(value); return v().u64(); } void _advanceLastAppliedLogEntry(rocksdb::Transaction& dbTxn, uint64_t index) { uint64_t oldIndex = _lastAppliedLogEntry(dbTxn); ALWAYS_ASSERT(oldIndex+1 == index, "old index is %s, expected %s, got %s", oldIndex, oldIndex+1, index); LOG_DEBUG(_env, "bumping log index from %s to %s", oldIndex, index); StaticValue v; v().setU64(index); ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), v.toSlice())); } void _addToDirsToTxns(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) { for (const auto dirId: directoriesNeedingLock(req)) { LOG_DEBUG(_env, "adding dir %s for txn %s", dirId, txnId); { // into the set StaticValue k; k().setDirId(dirId); k().setTxnId(txnId); ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, k.toSlice(), "")); } { // sentinel, if necessary StaticValue k; k().setDirId(dirId); k().setSentinel(); std::string v; auto status = dbTxn.Get({}, _dirsToTxnsCf, k.toSlice(), &v); if (status.IsNotFound()) { // we're the first ones here, add the sentinel auto v = CDCTxnIdValue::Static(txnId); ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, k.toSlice(), v.toSlice())); } else { ROCKS_DB_CHECKED(status); } } } } // Returns the txn ids that might be free to work now. Note that we don't // know that for sure because they might not hold locks for all dirs. This // function does not check that. void _removeFromDirsToTxns(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req, std::vector& mightBeReady) { for (const auto dirId: directoriesNeedingLock(req)) { LOG_DEBUG(_env, "removing dir %s for txn %s", dirId, txnId); StaticValue k; k().setDirId(dirId); k().setTxnId(txnId); // Check if there's a next key in line. We know that we won't // have to step over many deleted keys here because we go through the // list in order, and we seek from the list element we've just deleted. // It is however important to set the iteration upper bound as of to // not spill over and possibly trip over deleted keys. StaticValue upperBoundK; upperBoundK().setDirId(dirId); upperBoundK().setTxnId(CDCTxnId(~(uint64_t)0)); rocksdb::Slice upperBoundSlice = upperBoundK.toSlice(); rocksdb::ReadOptions itOptions; itOptions.iterate_upper_bound = &upperBoundSlice; std::unique_ptr it(dbTxn.GetIterator(itOptions, _dirsToTxnsCf)); it->Seek(k.toSlice()); // we must find the key here -- we're removing it. ALWAYS_ASSERT(it->Valid()); { // Additional safety check: the key is what we expect. auto foundKey = ExternalValue::FromSlice(it->key()); ALWAYS_ASSERT(!foundKey().isSentinel() && foundKey().txnId() == txnId); } // now that we've done our checks, we can remove the key ROCKS_DB_CHECKED(dbTxn.Delete(_dirsToTxnsCf, k.toSlice())); // then we look for the next one, if there's anything, // and overwrite/delete the sentinel StaticValue sentinelK; sentinelK().setDirId(dirId); sentinelK().setSentinel(); it->Next(); if (it->Valid()) { // there's something, set the sentinel auto nextK = ExternalValue::FromSlice(it->key()); auto sentinelV = CDCTxnIdValue::Static(nextK().txnId()); LOG_DEBUG(_env, "selected %s as next in line after finishing %s", nextK().txnId(), txnId); mightBeReady.emplace_back(nextK().txnId()); ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, sentinelK.toSlice(), sentinelV.toSlice())); } else { // we were the last ones here, remove sentinel ROCKS_DB_CHECKED(it->status()); ROCKS_DB_CHECKED(dbTxn.Delete(_dirsToTxnsCf, sentinelK.toSlice())); } } } // Check if we have a lock on all the directories that matter to the txn id. // It is assumed that the txnId in question is already in _dirsToTxnsCf. bool _isReadyToGo(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) { for (const auto dirId: directoriesNeedingLock(req)) { // the sentinel _must_ be present -- at the very least us! StaticValue k; k().setDirId(dirId); k().setSentinel(); std::string v; ROCKS_DB_CHECKED(dbTxn.Get({}, _dirsToTxnsCf, k.toSlice(), &v)); ExternalValue otherTxnId(v); if (otherTxnId().id() != txnId) { return false; } } return true; } // Adds a request to the enqueued requests. Also adds it to dirsToTxns, which will implicitly // acquire locks. void _addToEnqueued(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) { { auto k = CDCTxnIdKey::Static(txnId); std::string v = bincodeToRocksValue(req); ROCKS_DB_CHECKED(dbTxn.Put(_enqueuedCf, k.toSlice(), v)); } _addToDirsToTxns(dbTxn, txnId, req); } // Moves the state forward, filling in `step` appropriatedly, and writing // out the updated state. template typename V> void _advance( rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req, const ShardRespContainer* shardResp, V& state, CDCStep& step, // to collect things that might be able to start now because // we've finished doing other stuff std::vector& txnIds ) { LOG_DEBUG(_env, "advancing txn %s of kind %s with state", txnId, req.kind()); StateMachineEnv sm(_env, _defaultCf, _parentCf, dbTxn, txnId, state().step(), step); switch (req.kind()) { case CDCMessageKind::MAKE_DIRECTORY: MakeDirectoryStateMachine(sm, req.getMakeDirectory(), state().getMakeDirectory()).resume(shardResp); break; case CDCMessageKind::HARD_UNLINK_DIRECTORY: HardUnlinkDirectoryStateMachine(sm, req.getHardUnlinkDirectory(), state().getHardUnlinkDirectory()).resume(shardResp); break; case CDCMessageKind::RENAME_FILE: RenameFileStateMachine(sm, req.getRenameFile(), state().getRenameFile()).resume(shardResp); break; case CDCMessageKind::SOFT_UNLINK_DIRECTORY: SoftUnlinkDirectoryStateMachine(sm, req.getSoftUnlinkDirectory(), state().getSoftUnlinkDirectory()).resume(shardResp); break; case CDCMessageKind::RENAME_DIRECTORY: RenameDirectoryStateMachine(sm, req.getRenameDirectory(), state().getRenameDirectory()).resume(shardResp); break; case CDCMessageKind::CROSS_SHARD_HARD_UNLINK_FILE: CrossShardHardUnlinkFileStateMachine(sm, req.getCrossShardHardUnlinkFile(), state().getCrossShardHardUnlinkFile()).resume(shardResp); break; default: throw TERN_EXCEPTION("bad cdc message kind %s", req.kind()); } state().setStep(sm.txnStep); if (sm.finished) { // we finished immediately LOG_DEBUG(_env, "txn %s with req %s finished", txnId, req); _finishExecuting(dbTxn, txnId, req, txnIds); } else { // we still have something to do, persist _setExecuting(dbTxn, txnId, state); } } bool _isExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId) { auto k = CDCTxnIdKey::Static(txnId); std::string v; auto status = dbTxn.Get({}, _executingCf, k.toSlice(), &v); if (status.IsNotFound()) { return false; } ROCKS_DB_CHECKED(status); return true; } template typename V> void _setExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId, V& state) { auto k = CDCTxnIdKey::Static(txnId); ROCKS_DB_CHECKED(dbTxn.Put(_executingCf, k.toSlice(), state.toSlice())); } void _finishExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req, std::vector& txnIds) { { // delete from _executingCf auto k = CDCTxnIdKey::Static(txnId); ROCKS_DB_CHECKED(dbTxn.Delete(_executingCf, k.toSlice())); } // delete from dirsToTxnIds _removeFromDirsToTxns(dbTxn, txnId, req, txnIds); } // Starts executing the given transactions, if possible. If it managed // to start something, it immediately advances it as well (no point delaying // that). // // It modifies `txnIds` with new transactions we looked at if we immediately // finish executing txns that we start here. void _startExecuting(rocksdb::Transaction& dbTxn, std::vector& txnIds, CDCStep& step) { CDCReqContainer req; for (int i = 0; i < txnIds.size(); i++) { CDCTxnId txnId = txnIds[i]; auto reqK = CDCTxnIdKey::Static(txnId); std::string reqV; ROCKS_DB_CHECKED(dbTxn.Get({}, _enqueuedCf, reqK.toSlice(), &reqV)); bincodeFromRocksValue(reqV, req); if (!_isExecuting(dbTxn, txnId)) { if (_isReadyToGo(dbTxn, txnId, req)) { LOG_DEBUG(_env, "starting to execute txn %s with req %s, since it is ready to go and not executing already", txnId, req); StaticValue txnState; txnState().start(req.kind()); _setExecuting(dbTxn, txnId, txnState); _advance(dbTxn, txnId, req, nullptr, txnState, step, txnIds); } else { LOG_DEBUG(_env, "waiting before executing txn %s with req %s, since it is not ready to go", txnId, req); } } } } void _enqueueCDCReqs( rocksdb::Transaction& dbTxn, const std::vector& reqs, CDCStep& step, // we need two lists because one (`cdcReqsTxnIds`) is specifically // to return to the user, while the other is used for other purposes, // too. std::vector& txnIdsToStart, std::vector& cdcReqsTxnIds ) { for (const auto& req: reqs) { uint64_t txnId; { // Generate new txn id txnId = _lastTxn(dbTxn) + 1; _setLastTxn(dbTxn, txnId); // Push to queue _addToEnqueued(dbTxn, txnId, req); LOG_DEBUG(_env, "enqueued CDC req %s with txn id %s", req.kind(), txnId); } txnIdsToStart.emplace_back(txnId); cdcReqsTxnIds.emplace_back(txnId); } } void _advanceWithResp( rocksdb::Transaction& dbTxn, CDCTxnId txnId, const ShardRespContainer* resp, CDCStep& step, std::vector& txnIdsToStart ) { auto txnIdK = CDCTxnIdKey::Static(txnId); // Get the req CDCReqContainer cdcReq; { std::string reqV; ROCKS_DB_CHECKED(dbTxn.Get({}, _enqueuedCf, txnIdK.toSlice(), &reqV)); bincodeFromRocksValue(reqV, cdcReq); } // Get the state std::string txnStateV; ROCKS_DB_CHECKED(dbTxn.Get({}, _executingCf, txnIdK.toSlice(), &txnStateV)); ExternalValue txnState(txnStateV); // Advance with response _advance(dbTxn, txnId, cdcReq, resp, txnState, step, txnIdsToStart); } void update( bool sync, uint64_t logIndex, const std::vector& cdcReqs, const std::vector& shardResps, CDCStep& step, std::vector& cdcReqsTxnIds ) { auto locked = _processLock.lock(); rocksdb::WriteOptions options; options.sync = sync; std::unique_ptr dbTxn(_dbDontUseDirectly->BeginTransaction(options)); _advanceLastAppliedLogEntry(*dbTxn, logIndex); step.clear(); cdcReqsTxnIds.clear(); { std::vector txnIdsToStart; _enqueueCDCReqs(*dbTxn, cdcReqs, step, txnIdsToStart, cdcReqsTxnIds); for (const auto& resp: shardResps) { _advanceWithResp(*dbTxn, resp.txnId, &resp.resp, step, txnIdsToStart); } _startExecuting(*dbTxn, txnIdsToStart, step); } commitTransaction(*dbTxn); } void bootstrap( bool sync, uint64_t logIndex, CDCStep& step ) { auto locked = _processLock.lock(); rocksdb::WriteOptions options; options.sync = sync; std::unique_ptr dbTxn(_dbDontUseDirectly->BeginTransaction(options)); step.clear(); _advanceLastAppliedLogEntry(*dbTxn, logIndex); std::vector txnIdsToStart; // Just collect all executing txns, and run them std::unique_ptr it(dbTxn->GetIterator({}, _executingCf)); for (it->SeekToFirst(); it->Valid(); it->Next()) { auto txnIdK = ExternalValue::FromSlice(it->key()); _advanceWithResp(*dbTxn, txnIdK().id(), nullptr, step, txnIdsToStart); } ROCKS_DB_CHECKED(it->status()); _startExecuting(*dbTxn, txnIdsToStart, step); commitTransaction(*dbTxn); } }; CDCDB::CDCDB(Logger& logger, std::shared_ptr& xmon, SharedRocksDB& sharedDb) { _impl = new CDCDBImpl(logger, xmon, sharedDb); } CDCDB::~CDCDB() { delete ((CDCDBImpl*)_impl); } void CDCDB::applyLogEntry(bool sync, const CDCLogEntry& entry, CDCStep& step, std::vector& cdcReqsTxnIds) { if (unlikely(entry.bootstrapEntry())) { ((CDCDBImpl*)_impl)->bootstrap(sync, entry.logIdx(), step); } else { ((CDCDBImpl*)_impl)->update(sync, entry.logIdx(), entry.cdcReqs(), entry.shardResps(), step, cdcReqsTxnIds); } } uint64_t CDCDB::lastAppliedLogEntry() { return ((CDCDBImpl*)_impl)->_lastAppliedLogEntryDB(); }