cdc: error part of shard response

This commit is contained in:
Miroslav Crnic
2024-06-13 13:00:43 +01:00
committed by GitHub Enterprise
parent 2cd15fc0be
commit 9d06deeedc
3 changed files with 135 additions and 120 deletions
+9 -9
View File
@@ -543,19 +543,19 @@ private:
}
void _recordCDCShardResp(uint64_t requestId, CDCShardResp& resp) {
resp.err = resp.resp.kind() != ShardMessageKind::ERROR ? EggsError::NO_ERROR : resp.resp.getError();
_shared.shardErrors.add(resp.err);
if (resp.err == EggsError::NO_ERROR) {
auto err = resp.resp.kind() != ShardMessageKind::ERROR ? EggsError::NO_ERROR : resp.resp.getError();
_shared.shardErrors.add(err);
if (err == EggsError::NO_ERROR) {
LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, process soon", requestId, resp.resp.kind());
return;
} else if (resp.err == EggsError::TIMEOUT) {
} else if (err == EggsError::TIMEOUT) {
LOG_DEBUG(_env, "txn %s shard req %s, timed out", resp.txnId, requestId);
} else if (innocuousShardError(resp.err)) {
LOG_DEBUG(_env, "txn %s shard req %s, finished with innocuous error %s", resp.txnId, requestId, resp.err);
} else if (rareInnocuousShardError(resp.err)) {
LOG_INFO(_env, "txn %s shard req %s, finished with rare innocuous error %s", resp.txnId, requestId, resp.err);
} else if (innocuousShardError(err)) {
LOG_DEBUG(_env, "txn %s shard req %s, finished with innocuous error %s", resp.txnId, requestId, err);
} else if (rareInnocuousShardError(err)) {
LOG_INFO(_env, "txn %s shard req %s, finished with rare innocuous error %s", resp.txnId, requestId, err);
} else {
RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", resp.txnId, requestId, resp.err);
RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", resp.txnId, requestId, err);
}
}
+125 -107
View File
@@ -1,5 +1,6 @@
#include "CDCDB.hpp"
#include <cstdint>
#include <limits>
#include <memory>
#include <rocksdb/iterator.h>
@@ -15,6 +16,7 @@
#include "CDCDBData.hpp"
#include "Common.hpp"
#include "Exception.hpp"
#include "MsgsGen.hpp"
#include "RocksDBUtils.hpp"
#include "ShardDB.hpp"
#include "Time.hpp"
@@ -102,14 +104,7 @@ std::ostream& operator<<(std::ostream& out, CDCTxnId id) {
}
std::ostream& operator<<(std::ostream& out, const CDCShardResp& x) {
out << "CDCShardResp(txnId=" << x.txnId << ", ";
if (x.err == EggsError::NO_ERROR) {
out << "resp=" << x.resp;
} else {
out << "err=" << x.err;
}
out << ")";
return out;
return out << "CDCShardResp(txnId=" << x.txnId << ", resp=" << x.resp << ")";
}
std::ostream& operator<<(std::ostream& out, const CDCLogEntry& x) {
@@ -296,12 +291,12 @@ struct MakeDirectoryStateMachine {
env(env_), req(req_), state(state_)
{}
void resume(EggsError err, const ShardRespContainer* resp) {
void resume(const ShardRespContainer* resp) {
if (env.txnStep == TXN_START) {
start();
return;
}
if (unlikely(err == EggsError::NO_ERROR && resp == nullptr)) { // we're resuming with no response
if (unlikely(resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case MAKE_DIRECTORY_LOOKUP: lookup(); break;
case MAKE_DIRECTORY_CREATE_DIR: createDirectoryInode(); break;
@@ -313,12 +308,12 @@ struct MakeDirectoryStateMachine {
}
} else {
switch (env.txnStep) {
case MAKE_DIRECTORY_LOOKUP: afterLookup(err, resp); break;
case MAKE_DIRECTORY_CREATE_DIR: afterCreateDirectoryInode(err, resp); break;
case MAKE_DIRECTORY_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(err, resp); break;
case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: afterCreateLockedEdge(err, resp); break;
case MAKE_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(err, resp); break;
case MAKE_DIRECTORY_ROLLBACK: afterRollback(err, resp); break;
case MAKE_DIRECTORY_LOOKUP: afterLookup(*resp); break;
case MAKE_DIRECTORY_CREATE_DIR: afterCreateDirectoryInode(*resp); break;
case MAKE_DIRECTORY_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(*resp); break;
case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: afterCreateLockedEdge(*resp); break;
case MAKE_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(*resp); break;
case MAKE_DIRECTORY_ROLLBACK: afterRollback(*resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
@@ -335,7 +330,8 @@ struct MakeDirectoryStateMachine {
shardReq.name = req.name;
}
void afterLookup(EggsError err, const ShardRespContainer* resp) {
void afterLookup(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
lookup(true); // retry
} else if (err == EggsError::DIRECTORY_NOT_FOUND) {
@@ -345,7 +341,7 @@ struct MakeDirectoryStateMachine {
createDirectoryInode();
} else {
ALWAYS_ASSERT(err == EggsError::NO_ERROR);
const auto& lookupResp = resp->getLookup();
const auto& lookupResp = resp.getLookup();
if (lookupResp.targetId.type() == InodeType::DIRECTORY) {
// we're good already
auto& cdcResp = env.finish().setMakeDirectory();
@@ -363,12 +359,13 @@ struct MakeDirectoryStateMachine {
shardReq.ownerId = req.ownerId;
}
void afterCreateDirectoryInode(EggsError shardRespError, const ShardRespContainer* shardResp) {
if (shardRespError == EggsError::TIMEOUT) {
void afterCreateDirectoryInode(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
// Try again -- note that the call to create directory inode is idempotent.
createDirectoryInode(true);
} else {
ALWAYS_ASSERT(shardRespError == EggsError::NO_ERROR);
ALWAYS_ASSERT(err == EggsError::NO_ERROR);
lookupOldCreationTime();
}
}
@@ -382,7 +379,8 @@ struct MakeDirectoryStateMachine {
shardReq.startTime = 0; // we have current set
}
void afterLookupOldCreationTime(EggsError err, const ShardRespContainer* resp) {
void afterLookupOldCreationTime(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
lookupOldCreationTime(true); // retry
} else if (err == EggsError::DIRECTORY_NOT_FOUND) {
@@ -393,7 +391,7 @@ struct MakeDirectoryStateMachine {
} else {
ALWAYS_ASSERT(err == EggsError::NO_ERROR);
// there might be no existing edge
const auto& fullReadDir = resp->getFullReadDir();
const auto& fullReadDir = resp.getFullReadDir();
ALWAYS_ASSERT(fullReadDir.results.els.size() < 2); // we have limit=1
if (fullReadDir.results.els.size() == 0) {
state.setOldCreationTime(0); // there was nothing present for this name
@@ -413,7 +411,8 @@ struct MakeDirectoryStateMachine {
shardReq.oldCreationTime = state.oldCreationTime();
}
void afterCreateLockedEdge(EggsError err, const ShardRespContainer* resp) {
void afterCreateLockedEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (createCurrentLockedEdgeRetry(err)) {
createLockedEdge(true); // try again
} else if (err == EggsError::CANNOT_OVERRIDE_NAME) {
@@ -431,7 +430,7 @@ struct MakeDirectoryStateMachine {
// We also cannot get MISMATCHING_TARGET since we are the only one
// creating locked edges, and transactions execute serially.
ALWAYS_ASSERT(err == EggsError::NO_ERROR);
state.setCreationTime(resp->getCreateLockedCurrentEdge().creationTime);
state.setCreationTime(resp.getCreateLockedCurrentEdge().creationTime);
unlockEdge();
}
}
@@ -445,7 +444,8 @@ struct MakeDirectoryStateMachine {
shardReq.creationTime = state.creationTime();
}
void afterUnlockEdge(EggsError err, const ShardRespContainer* resp) {
void afterUnlockEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
// retry
unlockEdge(true);
@@ -472,7 +472,8 @@ struct MakeDirectoryStateMachine {
shardReq.info = defaultDirectoryInfo();
}
void afterRollback(EggsError err, const ShardRespContainer* resp) {
void afterRollback(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
rollback(true); // retry
} else {
@@ -499,19 +500,19 @@ struct HardUnlinkDirectoryStateMachine {
env(env_), req(req_), state(state_)
{}
void resume(EggsError err, const ShardRespContainer* resp) {
void resume(const ShardRespContainer* resp) {
if (env.txnStep == TXN_START) {
removeInode();
return;
}
if (unlikely(err == EggsError::NO_ERROR && resp == nullptr)) { // we're resuming with no response
if (unlikely(resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case HARD_UNLINK_DIRECTORY_REMOVE_INODE: removeInode(); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
} else {
switch (env.txnStep) {
case HARD_UNLINK_DIRECTORY_REMOVE_INODE: afterRemoveInode(err, resp); break;
case HARD_UNLINK_DIRECTORY_REMOVE_INODE: afterRemoveInode(*resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
@@ -522,7 +523,8 @@ struct HardUnlinkDirectoryStateMachine {
shardReq.id = req.dirId;
}
void afterRemoveInode(EggsError err, const ShardRespContainer* resp) {
void afterRemoveInode(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
removeInode(true); // try again
} else if (
@@ -563,12 +565,12 @@ struct RenameFileStateMachine {
env(env_), req(req_), state(state_)
{}
void resume(EggsError err, const ShardRespContainer* resp) {
void resume(const ShardRespContainer* resp) {
if (env.txnStep == TXN_START) {
start();
return;
}
if (unlikely(err == EggsError::NO_ERROR && resp == nullptr)) { // we're resuming with no response
if (unlikely(resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case RENAME_FILE_LOCK_OLD_EDGE: lockOldEdge(); break;
case RENAME_FILE_LOOKUP_OLD_CREATION_TIME: lookupOldCreationTime(); break;
@@ -580,12 +582,12 @@ struct RenameFileStateMachine {
}
} else {
switch (env.txnStep) {
case RENAME_FILE_LOCK_OLD_EDGE: afterLockOldEdge(err, resp); break;
case RENAME_FILE_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(err, resp); break;
case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: afterCreateNewLockedEdge(err, resp); break;
case RENAME_FILE_UNLOCK_NEW_EDGE: afterUnlockNewEdge(err, resp); break;
case RENAME_FILE_UNLOCK_OLD_EDGE: afterUnlockOldEdge(err, resp); break;
case RENAME_FILE_ROLLBACK: afterRollback(err, resp); break;
case RENAME_FILE_LOCK_OLD_EDGE: afterLockOldEdge(*resp); break;
case RENAME_FILE_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(*resp); break;
case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: afterCreateNewLockedEdge(*resp); break;
case RENAME_FILE_UNLOCK_NEW_EDGE: afterUnlockNewEdge(*resp); break;
case RENAME_FILE_UNLOCK_OLD_EDGE: afterUnlockOldEdge(*resp); break;
case RENAME_FILE_ROLLBACK: afterRollback(*resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
@@ -611,7 +613,8 @@ struct RenameFileStateMachine {
shardReq.creationTime = req.oldCreationTime;
}
void afterLockOldEdge(EggsError err, const ShardRespContainer* resp) {
void afterLockOldEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
lockOldEdge(true); // retry
} else if (
@@ -637,7 +640,8 @@ struct RenameFileStateMachine {
shardReq.startTime = 0; // we have current set
}
void afterLookupOldCreationTime(EggsError err, const ShardRespContainer* resp) {
void afterLookupOldCreationTime(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
lookupOldCreationTime(true); // retry
} else if (err == EggsError::DIRECTORY_NOT_FOUND) {
@@ -648,7 +652,7 @@ struct RenameFileStateMachine {
} else {
ALWAYS_ASSERT(err == EggsError::NO_ERROR);
// there might be no existing edge
const auto& fullReadDir = resp->getFullReadDir();
const auto& fullReadDir = resp.getFullReadDir();
ALWAYS_ASSERT(fullReadDir.results.els.size() < 2); // we have limit=1
if (fullReadDir.results.els.size() == 0) {
state.setNewOldCreationTime(0); // there was nothing present for this name
@@ -668,7 +672,8 @@ struct RenameFileStateMachine {
shardReq.oldCreationTime = state.newOldCreationTime();
}
void afterCreateNewLockedEdge(EggsError err, const ShardRespContainer* resp) {
void afterCreateNewLockedEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (createCurrentLockedEdgeRetry(err)) {
createNewLockedEdge(true); // retry
} else if (err == EggsError::MISMATCHING_CREATION_TIME) {
@@ -679,7 +684,7 @@ struct RenameFileStateMachine {
state.setExitError(err);
rollback();
} else {
state.setNewCreationTime(resp->getCreateLockedCurrentEdge().creationTime);
state.setNewCreationTime(resp.getCreateLockedCurrentEdge().creationTime);
unlockNewEdge();
}
}
@@ -693,7 +698,8 @@ struct RenameFileStateMachine {
shardReq.creationTime = state.newCreationTime();
}
void afterUnlockNewEdge(EggsError err, const ShardRespContainer* resp) {
void afterUnlockNewEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
unlockNewEdge(true); // retry
} else {
@@ -712,7 +718,8 @@ struct RenameFileStateMachine {
shardReq.creationTime = req.oldCreationTime;
}
void afterUnlockOldEdge(EggsError err, const ShardRespContainer* resp) {
void afterUnlockOldEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
unlockOldEdge(true); // retry
} else {
@@ -735,7 +742,8 @@ struct RenameFileStateMachine {
shardReq.creationTime = state.newCreationTime();
}
void afterRollback(EggsError err, const ShardRespContainer* resp) {
void afterRollback(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
rollback(true); // retry
} else {
@@ -773,12 +781,12 @@ struct SoftUnlinkDirectoryStateMachine {
env(env_), req(req_), state(state_)
{}
void resume(EggsError err, const ShardRespContainer* resp) {
void resume(const ShardRespContainer* resp) {
if (env.txnStep == TXN_START) {
start();
return;
}
if (unlikely(err == EggsError::NO_ERROR && resp == nullptr)) { // we're resuming with no response
if (unlikely(resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: lockEdge(); break;
case SOFT_UNLINK_DIRECTORY_STAT: stat(); break;
@@ -790,11 +798,11 @@ struct SoftUnlinkDirectoryStateMachine {
}
} else {
switch (env.txnStep) {
case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: afterLockEdge(err, resp); break;
case SOFT_UNLINK_DIRECTORY_STAT: afterStat(err, resp); break;
case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: afterRemoveOwner(err, resp); break;
case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(err, resp); break;
case SOFT_UNLINK_DIRECTORY_ROLLBACK: afterRollback(err, resp); break;
case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: afterLockEdge(*resp); break;
case SOFT_UNLINK_DIRECTORY_STAT: afterStat(*resp); break;
case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: afterRemoveOwner(*resp); break;
case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(*resp); break;
case SOFT_UNLINK_DIRECTORY_ROLLBACK: afterRollback(*resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
@@ -816,7 +824,8 @@ struct SoftUnlinkDirectoryStateMachine {
shardReq.creationTime = req.creationTime;
}
void afterLockEdge(EggsError err, const ShardRespContainer* resp) {
void afterLockEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
lockEdge(true);
} else if (err == EggsError::MISMATCHING_CREATION_TIME || err == EggsError::EDGE_NOT_FOUND || err == EggsError::DIRECTORY_NOT_FOUND) {
@@ -833,12 +842,13 @@ struct SoftUnlinkDirectoryStateMachine {
shardReq.id = state.statDirId();
}
void afterStat(EggsError err, const ShardRespContainer* resp) {
void afterStat(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
stat(true); // retry
} else {
ALWAYS_ASSERT(err == EggsError::NO_ERROR);
const auto& statResp = resp->getStatDirectory();
const auto& statResp = resp.getStatDirectory();
// insert tags
for (const auto& newEntry : statResp.info.entries.els) {
bool found = false;
@@ -865,7 +875,8 @@ struct SoftUnlinkDirectoryStateMachine {
}
}
void afterRemoveOwner(EggsError err, const ShardRespContainer* resp) {
void afterRemoveOwner(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
// we don't want to keep the dir info around start again from the last stat
stat();
@@ -890,7 +901,8 @@ struct SoftUnlinkDirectoryStateMachine {
shardReq.creationTime = req.creationTime;
}
void afterUnlockEdge(EggsError err, const ShardRespContainer* resp) {
void afterUnlockEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
unlockEdge(true);
} else {
@@ -916,7 +928,8 @@ struct SoftUnlinkDirectoryStateMachine {
shardReq.creationTime = req.creationTime;
}
void afterRollback(EggsError err, const ShardRespContainer* resp) {
void afterRollback(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
rollback(true);
} else {
@@ -960,12 +973,12 @@ struct RenameDirectoryStateMachine {
env(env_), req(req_), state(state_)
{}
void resume(EggsError err, const ShardRespContainer* resp) {
void resume(const ShardRespContainer* resp) {
if (env.txnStep == TXN_START) {
start();
return;
}
if (unlikely(err == EggsError::NO_ERROR && resp == nullptr)) { // we're resuming with no response
if (unlikely(resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case RENAME_DIRECTORY_LOCK_OLD_EDGE: lockOldEdge(); break;
case RENAME_DIRECTORY_LOOKUP_OLD_CREATION_TIME: lookupOldCreationTime(); break;
@@ -978,13 +991,13 @@ struct RenameDirectoryStateMachine {
}
} else {
switch (env.txnStep) {
case RENAME_DIRECTORY_LOCK_OLD_EDGE: afterLockOldEdge(err, resp); break;
case RENAME_DIRECTORY_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(err, resp); break;
case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: afterCreateLockedEdge(err, resp); break;
case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: afterUnlockNewEdge(err, resp); break;
case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: afterUnlockOldEdge(err, resp); break;
case RENAME_DIRECTORY_SET_OWNER: afterSetOwner(err, resp); break;
case RENAME_DIRECTORY_ROLLBACK: afterRollback(err, resp); break;
case RENAME_DIRECTORY_LOCK_OLD_EDGE: afterLockOldEdge(*resp); break;
case RENAME_DIRECTORY_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(*resp); break;
case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: afterCreateLockedEdge(*resp); break;
case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: afterUnlockNewEdge(*resp); break;
case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: afterUnlockOldEdge(*resp); break;
case RENAME_DIRECTORY_SET_OWNER: afterSetOwner(*resp); break;
case RENAME_DIRECTORY_ROLLBACK: afterRollback(*resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
@@ -1039,7 +1052,8 @@ struct RenameDirectoryStateMachine {
shardReq.creationTime = req.oldCreationTime;
}
void afterLockOldEdge(EggsError err, const ShardRespContainer* resp) {
void afterLockOldEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
lockOldEdge(true); // retry
} else if (
@@ -1064,7 +1078,8 @@ struct RenameDirectoryStateMachine {
shardReq.startTime = 0; // we have current set
}
void afterLookupOldCreationTime(EggsError err, const ShardRespContainer* resp) {
void afterLookupOldCreationTime(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
lookupOldCreationTime(true); // retry
} else if (err == EggsError::DIRECTORY_NOT_FOUND) {
@@ -1074,7 +1089,7 @@ struct RenameDirectoryStateMachine {
} else {
ALWAYS_ASSERT(err == EggsError::NO_ERROR);
// there might be no existing edge
const auto& fullReadDir = resp->getFullReadDir();
const auto& fullReadDir = resp.getFullReadDir();
ALWAYS_ASSERT(fullReadDir.results.els.size() < 2); // we have limit=1
if (fullReadDir.results.els.size() == 0) {
state.setNewOldCreationTime(0); // there was nothing present for this name
@@ -1094,7 +1109,8 @@ struct RenameDirectoryStateMachine {
shardReq.oldCreationTime = state.newOldCreationTime();
}
void afterCreateLockedEdge(EggsError err, const ShardRespContainer* resp) {
void afterCreateLockedEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (createCurrentLockedEdgeRetry(err)) {
createLockedNewEdge(true);
} else if (err == EggsError::MISMATCHING_CREATION_TIME) {
@@ -1105,7 +1121,7 @@ struct RenameDirectoryStateMachine {
rollback();
} else {
ALWAYS_ASSERT(err == EggsError::NO_ERROR);
state.setNewCreationTime(resp->getCreateLockedCurrentEdge().creationTime);
state.setNewCreationTime(resp.getCreateLockedCurrentEdge().creationTime);
unlockNewEdge();
}
}
@@ -1119,7 +1135,8 @@ struct RenameDirectoryStateMachine {
shardReq.creationTime = state.newCreationTime();
}
void afterUnlockNewEdge(EggsError err, const ShardRespContainer* resp) {
void afterUnlockNewEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
unlockNewEdge(true);
} else if (err == EggsError::EDGE_NOT_FOUND) {
@@ -1142,7 +1159,8 @@ struct RenameDirectoryStateMachine {
shardReq.creationTime = req.oldCreationTime;
}
void afterUnlockOldEdge(EggsError err, const ShardRespContainer* resp) {
void afterUnlockOldEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
unlockOldEdge(true);
} else if (err == EggsError::EDGE_NOT_FOUND) {
@@ -1162,7 +1180,8 @@ struct RenameDirectoryStateMachine {
shardReq.dirId = req.targetId;
}
void afterSetOwner(EggsError err, const ShardRespContainer* resp) {
void afterSetOwner(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
setOwner(true);
} else {
@@ -1187,7 +1206,8 @@ struct RenameDirectoryStateMachine {
shardReq.creationTime = state.newCreationTime();
}
void afterRollback(EggsError err, const ShardRespContainer* resp) {
void afterRollback(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
rollback(true);
} else {
@@ -1216,12 +1236,12 @@ struct CrossShardHardUnlinkFileStateMachine {
env(env_), req(req_), state(state_)
{}
void resume(EggsError err, const ShardRespContainer* resp) {
void resume(const ShardRespContainer* resp) {
if (env.txnStep == TXN_START) {
start();
return;
}
if (unlikely(err == EggsError::NO_ERROR && resp == nullptr)) { // we're resuming with no response
if (unlikely(resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: removeEdge(); break;
case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: makeTransient(); break;
@@ -1229,8 +1249,8 @@ struct CrossShardHardUnlinkFileStateMachine {
}
} else {
switch (env.txnStep) {
case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: afterRemoveEdge(err, resp); break;
case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: afterMakeTransient(err, resp); break;
case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: afterRemoveEdge(*resp); break;
case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: afterMakeTransient(*resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
@@ -1254,7 +1274,8 @@ struct CrossShardHardUnlinkFileStateMachine {
shardReq.creationTime = req.creationTime;
}
void afterRemoveEdge(EggsError err, const ShardRespContainer* resp) {
void afterRemoveEdge(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT || err == EggsError::MTIME_IS_TOO_RECENT) {
removeEdge(true);
} else if (err == EggsError::DIRECTORY_NOT_FOUND) {
@@ -1271,7 +1292,8 @@ struct CrossShardHardUnlinkFileStateMachine {
shardReq.note = req.name;
}
void afterMakeTransient(EggsError err, const ShardRespContainer* resp) {
void afterMakeTransient(const ShardRespContainer& resp) {
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
if (err == EggsError::TIMEOUT) {
makeTransient(true);
} else {
@@ -1283,26 +1305,26 @@ struct CrossShardHardUnlinkFileStateMachine {
void CDCShardResp::pack(BincodeBuf& buf) const {
buf.packScalar(txnId.x);
auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : EggsError::NO_ERROR;
buf.packScalar(err);
if (err != EggsError::NO_ERROR) {
return;
if (err == EggsError::NO_ERROR) {
resp.pack(buf);
}
resp.pack(buf);
}
void CDCShardResp::unpack(BincodeBuf& buf) {
txnId.x = buf.unpackScalar<uint64_t>();
err = (EggsError)buf.unpackScalar<uint16_t>();
if (err != EggsError::NO_ERROR) {
auto err = buf.unpackScalar<EggsError>();
if (err == EggsError::NO_ERROR) {
resp.unpack(buf);
} else {
resp.setError() = err;
return;
}
resp.unpack(buf);
}
size_t CDCShardResp::packedSize() const {
size_t size{10};
if (err == EggsError::NO_ERROR) {
size_t size = sizeof(uint64_t) + sizeof(EggsError);
if (resp.kind() != ShardMessageKind::ERROR) {
size += resp.packedSize();
}
return size;
@@ -1725,9 +1747,6 @@ struct CDCDBImpl {
rocksdb::Transaction& dbTxn,
CDCTxnId txnId,
const CDCReqContainer& req,
// If `shardRespError` and `shardResp` are null, we're starting to execute.
// Otherwise, (err == EggsError::NO_ERROR) == (req != nullptr).
EggsError shardRespError,
const ShardRespContainer* shardResp,
V<TxnState>& state,
CDCStep& step,
@@ -1739,22 +1758,22 @@ struct CDCDBImpl {
StateMachineEnv sm(_env, _defaultCf, _parentCf, dbTxn, txnId, state().step(), step);
switch (req.kind()) {
case CDCMessageKind::MAKE_DIRECTORY:
MakeDirectoryStateMachine(sm, req.getMakeDirectory(), state().getMakeDirectory()).resume(shardRespError, shardResp);
MakeDirectoryStateMachine(sm, req.getMakeDirectory(), state().getMakeDirectory()).resume(shardResp);
break;
case CDCMessageKind::HARD_UNLINK_DIRECTORY:
HardUnlinkDirectoryStateMachine(sm, req.getHardUnlinkDirectory(), state().getHardUnlinkDirectory()).resume(shardRespError, shardResp);
HardUnlinkDirectoryStateMachine(sm, req.getHardUnlinkDirectory(), state().getHardUnlinkDirectory()).resume(shardResp);
break;
case CDCMessageKind::RENAME_FILE:
RenameFileStateMachine(sm, req.getRenameFile(), state().getRenameFile()).resume(shardRespError, shardResp);
RenameFileStateMachine(sm, req.getRenameFile(), state().getRenameFile()).resume(shardResp);
break;
case CDCMessageKind::SOFT_UNLINK_DIRECTORY:
SoftUnlinkDirectoryStateMachine(sm, req.getSoftUnlinkDirectory(), state().getSoftUnlinkDirectory()).resume(shardRespError, shardResp);
SoftUnlinkDirectoryStateMachine(sm, req.getSoftUnlinkDirectory(), state().getSoftUnlinkDirectory()).resume(shardResp);
break;
case CDCMessageKind::RENAME_DIRECTORY:
RenameDirectoryStateMachine(sm, req.getRenameDirectory(), state().getRenameDirectory()).resume(shardRespError, shardResp);
RenameDirectoryStateMachine(sm, req.getRenameDirectory(), state().getRenameDirectory()).resume(shardResp);
break;
case CDCMessageKind::CROSS_SHARD_HARD_UNLINK_FILE:
CrossShardHardUnlinkFileStateMachine(sm, req.getCrossShardHardUnlinkFile(), state().getCrossShardHardUnlinkFile()).resume(shardRespError, shardResp);
CrossShardHardUnlinkFileStateMachine(sm, req.getCrossShardHardUnlinkFile(), state().getCrossShardHardUnlinkFile()).resume(shardResp);
break;
default:
throw EGGS_EXCEPTION("bad cdc message kind %s", req.kind());
@@ -1818,7 +1837,7 @@ struct CDCDBImpl {
StaticValue<TxnState> txnState;
txnState().start(req.kind());
_setExecuting(dbTxn, txnId, txnState);
_advance(dbTxn, txnId, req, EggsError::NO_ERROR, nullptr, txnState, step, txnIds);
_advance(dbTxn, txnId, req, nullptr, txnState, step, txnIds);
} else {
LOG_DEBUG(_env, "waiting before executing txn %s with req %s, since it is not ready to go", txnId, req);
}
@@ -1855,7 +1874,6 @@ struct CDCDBImpl {
void _advanceWithResp(
rocksdb::Transaction& dbTxn,
CDCTxnId txnId,
EggsError err,
const ShardRespContainer* resp,
CDCStep& step,
std::vector<CDCTxnId>& txnIdsToStart
@@ -1876,7 +1894,7 @@ struct CDCDBImpl {
ExternalValue<TxnState> txnState(txnStateV);
// Advance with response
_advance(dbTxn, txnId, cdcReq, err, resp, txnState, step, txnIdsToStart);
_advance(dbTxn, txnId, cdcReq, resp, txnState, step, txnIdsToStart);
}
void update(
@@ -1902,7 +1920,7 @@ struct CDCDBImpl {
std::vector<CDCTxnId> txnIdsToStart;
_enqueueCDCReqs(*dbTxn, cdcReqs, step, txnIdsToStart, cdcReqsTxnIds);
for (const auto& resp: shardResps) {
_advanceWithResp(*dbTxn, resp.txnId, resp.err, &resp.resp, step, txnIdsToStart);
_advanceWithResp(*dbTxn, resp.txnId, &resp.resp, step, txnIdsToStart);
}
_startExecuting(*dbTxn, txnIdsToStart, step);
}
@@ -1930,7 +1948,7 @@ struct CDCDBImpl {
std::unique_ptr<rocksdb::Iterator> it(dbTxn->GetIterator({}, _executingCf));
for (it->Seek(""); it->Valid(); it->Next()) {
auto txnIdK = ExternalValue<CDCTxnIdKey>::FromSlice(it->key());
_advanceWithResp(*dbTxn, txnIdK().id(), EggsError::NO_ERROR, nullptr, step, txnIdsToStart);
_advanceWithResp(*dbTxn, txnIdK().id(), nullptr, step, txnIdsToStart);
}
ROCKS_DB_CHECKED(it->status());
+1 -4
View File
@@ -64,15 +64,12 @@ std::ostream& operator<<(std::ostream& out, const CDCStep& x);
struct CDCShardResp {
CDCTxnId txnId; // the transaction id we're getting a response for
// if err != NO_ERROR, resp is not filled in
EggsError err;
ShardRespContainer resp;
void pack(BincodeBuf& buf) const;
void unpack(BincodeBuf& buf);
size_t packedSize() const;
bool operator==(const CDCShardResp& rhs) const {
return txnId == rhs.txnId && err == rhs.err &&
resp == rhs.resp;
return txnId == rhs.txnId && resp == rhs.resp;
}
};