More assorted improvements

This commit is contained in:
Francesco Mazzoli
2023-02-15 14:03:53 +00:00
parent 51860fac3a
commit e1b8de02dc
22 changed files with 635 additions and 504 deletions

View File

@@ -19,7 +19,6 @@
#include "Env.hpp"
#include "Exception.hpp"
#include "Msgs.hpp"
#include "MsgsGen.hpp"
#include "Time.hpp"
#include "Undertaker.hpp"
#include "CDCDB.hpp"
@@ -585,7 +584,9 @@ public:
continue;
}
LOG_INFO(_env, "Registering ourselves (CDC, port %s) with shuckle", port);
std::string err = registerCDC(_shuckleHost, _shucklePort, 100_ms, _ownIp, port);
CDCStatus status;
_shared.db.status(status);
std::string err = registerCDC(_shuckleHost, _shucklePort, 100_ms, _ownIp, port, status);
if (!err.empty()) {
RAISE_ALERT(_env, "Couldn't register ourselves with shuckle: %s", err);
EggsTime successfulIterationAt = 0;

View File

@@ -12,7 +12,6 @@
#include "Env.hpp"
#include "Exception.hpp"
#include "Msgs.hpp"
#include "MsgsGen.hpp"
#include "RocksDBUtils.hpp"
#include "ShardDB.hpp"
@@ -204,15 +203,28 @@ struct MakeDirectoryStateMachine {
{}
void resume(EggsError err, const ShardRespContainer* resp) {
ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START));
switch (env.txnStep) {
case TXN_START: start(); break;
case MAKE_DIRECTORY_LOOKUP: afterLookup(err, resp); break;
case MAKE_DIRECTORY_CREATE_DIR: afterCreateDirectoryInode(err, resp); break;
case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: afterCreateLockedEdge(err, resp); break;
case MAKE_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(err, resp); break;
case MAKE_DIRECTORY_ROLLBACK: afterRollback(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
if (env.txnStep == TXN_START) {
start();
return;
}
if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case MAKE_DIRECTORY_LOOKUP: lookup(); break;
case MAKE_DIRECTORY_CREATE_DIR: createDirectoryInode(); break;
case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: createLockedEdge(); break;
case MAKE_DIRECTORY_UNLOCK_EDGE: unlockEdge(); break;
case MAKE_DIRECTORY_ROLLBACK: rollback(); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
} else {
switch (env.txnStep) {
case MAKE_DIRECTORY_LOOKUP: afterLookup(err, resp); break;
case MAKE_DIRECTORY_CREATE_DIR: afterCreateDirectoryInode(err, resp); break;
case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: afterCreateLockedEdge(err, resp); break;
case MAKE_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(err, resp); break;
case MAKE_DIRECTORY_ROLLBACK: afterRollback(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
}
@@ -351,11 +363,20 @@ struct HardUnlinkDirectoryStateMachine {
{}
void resume(EggsError err, const ShardRespContainer* resp) {
ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START));
switch (env.txnStep) {
case TXN_START: removeInode(); break;
case HARD_UNLINK_DIRECTORY_REMOVE_INODE: afterRemoveInode(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
if (env.txnStep == TXN_START) {
removeInode();
return;
}
if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case HARD_UNLINK_DIRECTORY_REMOVE_INODE: removeInode(); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
} else {
switch (env.txnStep) {
case HARD_UNLINK_DIRECTORY_REMOVE_INODE: afterRemoveInode(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
}
@@ -404,15 +425,28 @@ struct RenameFileStateMachine {
{}
void resume(EggsError err, const ShardRespContainer* resp) {
ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START));
switch (env.txnStep) {
case TXN_START: start(); break;
case RENAME_FILE_LOCK_OLD_EDGE: afterLockOldEdge(err, resp); break;
case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: afterCreateNewLockedEdge(err, resp); break;
case RENAME_FILE_UNLOCK_NEW_EDGE: afterUnlockNewEdge(err, resp); break;
case RENAME_FILE_UNLOCK_OLD_EDGE: afterUnlockOldEdge(err, resp); break;
case RENAME_FILE_ROLLBACK: afterRollback(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
if (env.txnStep == TXN_START) {
start();
return;
}
if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case RENAME_FILE_LOCK_OLD_EDGE: lockOldEdge(); break;
case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: createNewLockedEdge(); break;
case RENAME_FILE_UNLOCK_NEW_EDGE: unlockNewEdge(); break;
case RENAME_FILE_UNLOCK_OLD_EDGE: unlockOldEdge(); break;
case RENAME_FILE_ROLLBACK: rollback(); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
} else {
switch (env.txnStep) {
case RENAME_FILE_LOCK_OLD_EDGE: afterLockOldEdge(err, resp); break;
case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: afterCreateNewLockedEdge(err, resp); break;
case RENAME_FILE_UNLOCK_NEW_EDGE: afterUnlockNewEdge(err, resp); break;
case RENAME_FILE_UNLOCK_OLD_EDGE: afterUnlockOldEdge(err, resp); break;
case RENAME_FILE_ROLLBACK: afterRollback(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
}
@@ -565,15 +599,29 @@ struct SoftUnlinkDirectoryStateMachine {
{}
void resume(EggsError err, const ShardRespContainer* resp) {
ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START));
switch (env.txnStep) {
case TXN_START: start(); break;
case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: afterLockEdge(err, resp); break;
case SOFT_UNLINK_DIRECTORY_STAT: afterStat(err, resp); break;
case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: afterRemoveOwner(err, resp); break;
case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(err, resp); break;
case SOFT_UNLINK_DIRECTORY_ROLLBACK: afterRollback(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
if (env.txnStep == TXN_START) {
start();
return;
}
if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: lockEdge(); break;
case SOFT_UNLINK_DIRECTORY_STAT: stat(); break;
// We don't store the directory info, so we need to restart the stating when resuming
case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: stat(); break;
case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: unlockEdge(); break;
case SOFT_UNLINK_DIRECTORY_ROLLBACK: rollback(); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
} else {
switch (env.txnStep) {
case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: afterLockEdge(err, resp); break;
case SOFT_UNLINK_DIRECTORY_STAT: afterStat(err, resp); break;
case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: afterRemoveOwner(err, resp); break;
case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(err, resp); break;
case SOFT_UNLINK_DIRECTORY_ROLLBACK: afterRollback(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
}
@@ -722,16 +770,30 @@ struct RenameDirectoryStateMachine {
{}
void resume(EggsError err, const ShardRespContainer* resp) {
ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START));
switch (env.txnStep) {
case TXN_START: start(); break;
case RENAME_DIRECTORY_LOCK_OLD_EDGE: afterLockOldEdge(err, resp); break;
case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: afterCreateLockedEdge(err, resp); break;
case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: afterUnlockNewEdge(err, resp); break;
case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: afterUnlockOldEdge(err, resp); break;
case RENAME_DIRECTORY_SET_OWNER: afterSetOwner(err, resp); break;
case RENAME_DIRECTORY_ROLLBACK: afterRollback(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
if (env.txnStep == TXN_START) {
start();
return;
}
if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case RENAME_DIRECTORY_LOCK_OLD_EDGE: lockOldEdge(); break;
case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: createLockedNewEdge(); break;
case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: unlockNewEdge(); break;
case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: unlockOldEdge(); break;
case RENAME_DIRECTORY_SET_OWNER: setOwner(); break;
case RENAME_DIRECTORY_ROLLBACK: rollback(); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
} else {
switch (env.txnStep) {
case RENAME_DIRECTORY_LOCK_OLD_EDGE: afterLockOldEdge(err, resp); break;
case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: afterCreateLockedEdge(err, resp); break;
case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: afterUnlockNewEdge(err, resp); break;
case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: afterUnlockOldEdge(err, resp); break;
case RENAME_DIRECTORY_SET_OWNER: afterSetOwner(err, resp); break;
case RENAME_DIRECTORY_ROLLBACK: afterRollback(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
}
@@ -911,6 +973,12 @@ enum CrossShardHardUnlinkFileStep : uint8_t {
CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT = 2,
};
// Steps:
//
// 1. Remove owning edge in one shard
// 2. Make file transient in other shard
//
// Step 2 cannot fail.
struct CrossShardHardUnlinkFileStateMachine {
StateMachineEnv& env;
const CrossShardHardUnlinkFileReq& req;
@@ -921,12 +989,22 @@ struct CrossShardHardUnlinkFileStateMachine {
{}
void resume(EggsError err, const ShardRespContainer* resp) {
ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START));
switch (env.txnStep) {
case TXN_START: start(); break;
case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: afterRemoveEdge(err, resp); break;
case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: afterMakeTransient(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
if (env.txnStep == TXN_START) {
start();
return;
}
if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response
switch (env.txnStep) {
case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: removeEdge(); break;
case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: makeTransient(); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
} else {
switch (env.txnStep) {
case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: afterRemoveEdge(err, resp); break;
case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: afterMakeTransient(err, resp); break;
default: throw EGGS_EXCEPTION("bad step %s", env.txnStep);
}
}
}
@@ -973,14 +1051,6 @@ struct CrossShardHardUnlinkFileStateMachine {
}
};
// Steps:
//
// 1. Remove owning edge in one shard
// 2. Make file transient in other shard
//
// Step 2 cannot fail.
// Wrapper types to pack/unpack with kind
struct PackCDCReq {
const CDCReqContainer& req;
@@ -1260,18 +1330,18 @@ struct CDCDBImpl {
// Starts executing the next transaction in line, if possible. If it managed
// to start something, it immediately advances it as well (no point delaying
// that).
void _startExecuting(EggsTime time, rocksdb::Transaction& dbTxn, CDCStep& step) {
// that). Returns the txn id that was started, if any.
uint64_t _startExecuting(EggsTime time, rocksdb::Transaction& dbTxn, CDCStep& step) {
uint64_t executingTxn = _executingTxn(dbTxn);
if (executingTxn != 0) {
LOG_DEBUG(_env, "another transaction %s is already executing, can't start", executingTxn);
return;
return 0;
}
uint64_t txnToExecute = _reqQueuePeek(dbTxn, _cdcReq);
if (txnToExecute == 0) {
LOG_DEBUG(_env, "no transactions in queue found, can't start");
return;
return 0;
}
_setExecutingTxn(dbTxn, txnToExecute);
@@ -1279,6 +1349,7 @@ struct CDCDBImpl {
StaticValue<TxnState> txnState;
txnState().start(_cdcReq.kind());
_advance(time, dbTxn, txnToExecute, _cdcReq, NO_ERROR, nullptr, txnState, step);
return txnToExecute;
}
uint64_t processCDCReq(
@@ -1375,10 +1446,55 @@ struct CDCDBImpl {
step.clear();
_advanceLastAppliedLogEntry(*dbTxn, logIndex);
_startExecuting(time, *dbTxn, step);
uint64_t txnId = _startExecuting(time, *dbTxn, step);
if (txnId == 0) {
// no txn could be started, see if one is executing already to fill in the `step`
txnId = _executingTxn(*dbTxn);
if (txnId != 0) {
LOG_DEBUG(_env, "transaction %s is already executing, will resume it", txnId);
// Get the req
{
auto k = U64Key::Static(txnId);
std::string reqV;
ROCKS_DB_CHECKED(dbTxn->Get({}, _reqQueueCf, k.toSlice(), &reqV));
UnpackCDCReq ureq(_cdcReq);
bincodeFromRocksValue(reqV, ureq);
}
// Get the state
std::string txnStateV;
ROCKS_DB_CHECKED(dbTxn->Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV));
ExternalValue<TxnState> txnState(txnStateV);
// Advance
_advance(time, *dbTxn, txnId, _cdcReq, NO_ERROR, nullptr, txnState, step);
}
}
dbTxn->Commit();
}
void status(CDCStatus& resp) {
memset(&resp, 0, sizeof(resp));
std::unique_ptr<rocksdb::Transaction> dbTxn(_db->BeginTransaction({}));
uint64_t txnId = _executingTxn(*dbTxn);
if (txnId != 0) {
// Get the req kind
{
auto k = U64Key::Static(txnId);
std::string reqV;
ROCKS_DB_CHECKED(dbTxn->Get({}, _reqQueueCf, k.toSlice(), &reqV));
BincodeBuf bbuf(reqV);
resp.executingTxnKind = (CDCMessageKind)bbuf.unpackScalar<uint8_t>();
}
// Get the step
{
std::string txnStateV;
ROCKS_DB_CHECKED(dbTxn->Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV));
ExternalValue<TxnState> txnState(txnStateV);
resp.executingTxnStep = txnState().step();
}
}
resp.queuedTxns = _lastTxnInQueue(*dbTxn) - _firstTxnInQueue(*dbTxn);
}
};
CDCDB::CDCDB(Logger& logger, const std::string& path) {
@@ -1408,3 +1524,7 @@ void CDCDB::startNextTransaction(bool sync, EggsTime time, uint64_t logIndex, CD
uint64_t CDCDB::lastAppliedLogEntry() {
return ((CDCDBImpl*)_impl)->_lastAppliedLogEntry();
}
void CDCDB::status(CDCStatus& resp) {
return ((CDCDBImpl*)_impl)->status(resp);
}

View File

@@ -1,12 +1,9 @@
#pragma once
#include <cstdint>
#include <variant>
#include "Bincode.hpp"
#include "Msgs.hpp"
#include "Env.hpp"
#include "MsgsGen.hpp"
#include "Shuckle.hpp"
struct CDCShardReq {
ShardId shid;
@@ -86,7 +83,8 @@ public:
// Advances the CDC state using the given shard response.
//
// This function crashes hard if the caller passes it a response it's not expecting.
// This function crashes hard if the caller passes it a response it's not expecting. So the caller
// should track responses and make sure only the correct one is passed in.
void processShardResp(
bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already.
EggsTime time,
@@ -97,7 +95,9 @@ public:
CDCStep& step
);
// Starts the next transaction in line (if any).
// Does what it can to advance the state of the system, by starting the next
// transaction in line (if any). In any case, it returns what there is to do next
// in `step`.
//
// It's fine to call this function even if there's nothing to do -- and in fact
// you should do that when starting up the CDC, to make sure to finish
@@ -111,4 +111,6 @@ public:
// The index of the last log entry persisted to the DB
uint64_t lastAppliedLogEntry();
void status(CDCStatus& resp);
};