diff --git a/cpp/cdc/CDCDB.cpp b/cpp/cdc/CDCDB.cpp index 789863f3..3a8854d7 100644 --- a/cpp/cdc/CDCDB.cpp +++ b/cpp/cdc/CDCDB.cpp @@ -1698,14 +1698,22 @@ struct CDCDBImpl { sentinelK().setDirId(dirId); sentinelK().setSentinel(); it->Next(); + bool removeSentinel = true; if (it->Valid()) { // there's something, set the sentinel auto nextK = ExternalValue::FromSlice(it->key()); - auto sentinelV = CDCTxnIdValue::Static(nextK().txnId()); - LOG_DEBUG(_env, "selected %s as next in line after finishing %s", nextK().txnId(), txnId); - mightBeReady.emplace_back(nextK().txnId()); - ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, sentinelK.toSlice(), sentinelV.toSlice())); - } else { // we were the last ones here, remove sentinel + if (nextK().dirId() == dirId) { + removeSentinel = false; + auto sentinelV = CDCTxnIdValue::Static(nextK().txnId()); + LOG_DEBUG(_env, "selected %s as next in line after finishing %s", nextK().txnId(), txnId); + mightBeReady.emplace_back(nextK().txnId()); + ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, sentinelK.toSlice(), sentinelV.toSlice())); + } else { + RAISE_ALERT_APP_TYPE(_env, XmonAppType::DAYTIME, "Unexpectedly stepped from %s to %s, is RocksDB not respecting our upper bound?", dirId, nextK().dirId()); + } + } else { ROCKS_DB_CHECKED(it->status()); + } + if (removeSentinel) { // we were the last ones here, remove sentinel ROCKS_DB_CHECKED(dbTxn.Delete(_dirsToTxnsCf, sentinelK.toSlice())); } }