Fix CDC txn status tracking

This might be a resolution to #38, although I'm not sure yet.
This commit is contained in:
Francesco Mazzoli
2023-08-02 13:48:42 +00:00
parent 52351cd69f
commit acf4f129f6
2 changed files with 17 additions and 10 deletions
+1
View File
@@ -594,6 +594,7 @@ private:
_env.raiseAlert(alert, false, "we got %s/%s=%s when trying to send shard message, will wait and retry", err, translateErrno(err), safe_strerror(err));
sleepFor(100_ms);
} else {
_env.clearAlert(alert);
throw EXPLICIT_SYSCALL_EXCEPTION(err, "sendto");
}
}
+16 -10
View File
@@ -1397,9 +1397,9 @@ struct CDCDBImpl {
// out the updated state.
//
// This will _not_ start a new transaction automatically if the old one
// finishes.
// finishes. It does return whether the txn was finished though.
template<template<typename> typename V>
void _advance(
bool _advance(
EggsTime time,
rocksdb::Transaction& dbTxn,
uint64_t txnId,
@@ -1448,9 +1448,11 @@ struct CDCDBImpl {
ROCKS_DB_CHECKED(dbTxn.Delete(_defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY)));
// also, fill in whether we've got something next
step.nextTxn = _firstTxnInQueue(dbTxn);
return true;
} else {
// we're _not_ done, write out the state
ROCKS_DB_CHECKED(dbTxn.Put(_defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), state.toSlice()));
return false;
}
}
@@ -1474,10 +1476,12 @@ struct CDCDBImpl {
LOG_DEBUG(_env, "starting to execute txn %s with req %s", txnToExecute, _cdcReq);
StaticValue<TxnState> txnState;
txnState().start(_cdcReq.kind());
_advance(time, dbTxn, txnToExecute, _cdcReq, NO_ERROR, nullptr, txnState, step);
bool stillRunning = _advance(time, dbTxn, txnToExecute, _cdcReq, NO_ERROR, nullptr, txnState, step);
status.runningTxn = txnToExecute;
status.runningTxnKind = _cdcReq.kind();
if (stillRunning) {
status.runningTxn = txnToExecute;
status.runningTxnKind = _cdcReq.kind();
}
return txnToExecute;
}
@@ -1556,17 +1560,19 @@ struct CDCDBImpl {
bincodeFromRocksValue(reqV, ureq);
}
// Store status
status.runningTxn = txnId;
status.runningTxnKind = _cdcReq.kind();
// Get the state
std::string txnStateV;
ROCKS_DB_CHECKED(dbTxn->Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV));
ExternalValue<TxnState> txnState(txnStateV);
// Advance
_advance(time, *dbTxn, txnId, _cdcReq, respError, resp, txnState, step);
bool stillRunning = _advance(time, *dbTxn, txnId, _cdcReq, respError, resp, txnState, step);
if (stillRunning) {
// Store status
status.runningTxn = txnId;
status.runningTxnKind = _cdcReq.kind();
}
dbTxn->Commit();
}