registry: flag for better dynamic port handling

In case of full shutdown/startup and usage of dynamic ports it is
possible replicas get out of date address information.
Since they think there is enough replicas to form a quorum they will
not try to refetch address information immediately but after some delay.
This flag -using-dynamic-ports informs registry to wipe ports on startup
as empty ports are not considered valid addresses replicas will refetch
information quickly.

Note that in case of production rolling restart this is not a problem
as replica information changes slowly.
This commit is contained in:
Miroslav Crnic
2025-09-23 09:19:23 +00:00
committed by Miroslav Crnic
parent f1c7b19ce1
commit 1a66e1d2e1
8 changed files with 109 additions and 17 deletions

View File

@@ -89,7 +89,7 @@ bool Registerer::_updateReplicas(const std::vector<FullRegistryInfo> &allReplica
size_t knownReplicas{0};
for (auto &replica : localReplicas) {
if (replica.addrs[0].port != 0) {
if (replica.addrs[0].port != 0 || replica.addrs[1].port) {
++knownReplicas;
}
}

View File

@@ -508,7 +508,7 @@ private:
auto &allRegiResp = resp.resp.setAllRegistryReplicas();
for (uint8_t i = 0; i < _replicas.size(); ++i) {
auto &addrs = _replicas[i];
if (addrs.addrs[0].ip.data[0] == 0) {
if (addrs.addrs[0].port == 0 && addrs.addrs[1].port == 0) {
continue;
}
auto &replica = allRegiResp.replicas.els.emplace_back();

View File

@@ -27,6 +27,7 @@ struct RegistryOptions {
uint8_t alertAfterUnavailableFailureDomains = 3;
uint32_t maxFailureDomainsPerShard = 28;
Duration writableBlockServiceUpdateInterval = 30_mins;
bool usingDynamicPorts = false;
};
struct RegistryState;

View File

@@ -12,6 +12,7 @@
#include "RegistryDBData.hpp"
#include "RocksDBUtils.hpp"
#include "Time.hpp"
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
static constexpr auto REGISTRY_CF_NAME = "registry";
@@ -84,6 +85,80 @@ static bool addressesIntersect(const AddrsInfo& currentAddr, const AddrsInfo& ne
return false;
}
static void wipeRegistryPorts(rocksdb::WriteBatch& batch, rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf) {
auto* it = db->NewIterator(rocksdb::ReadOptions(), cf);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
FullRegistryInfo info;
readRegistryInfo(it->key(), it->value(), info);
for (auto& addr : info.addrs.addrs) {
addr.port = 0;
}
writeRegistryInfo(batch, cf, info);
}
ROCKS_DB_CHECKED(it->status());
delete it;
}
static void wipeShardPorts(rocksdb::WriteBatch& batch, rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf) {
auto* it = db->NewIterator(rocksdb::ReadOptions(), cf);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
FullShardInfo info;
readShardInfo(it->key(), it->value(), info);
for (auto& addr : info.addrs.addrs) {
addr.port = 0;
}
writeShardInfo(batch, cf, info);
}
ROCKS_DB_CHECKED(it->status());
delete it;
}
static void wipeCdcPorts(rocksdb::WriteBatch& batch, rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf) {
auto* it = db->NewIterator(rocksdb::ReadOptions(), cf);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
CdcInfo info;
readCdcInfo(it->key(), it->value(), info);
for (auto& addr : info.addrs.addrs) {
addr.port = 0;
}
writeCdcInfo(batch, cf, info);
}
ROCKS_DB_CHECKED(it->status());
delete it;
}
static void wipeBlockServicePorts(rocksdb::WriteBatch& batch, rocksdb::DB* db,
rocksdb::ColumnFamilyHandle* blockServiceCf, rocksdb::ColumnFamilyHandle* writableBlockServiceCf, rocksdb::ColumnFamilyHandle* lastHeartBeatCf)
{
auto* it = db->NewIterator(rocksdb::ReadOptions(), blockServiceCf);
auto now = ternNow();
for (it->SeekToFirst(); it->Valid(); it->Next()) {
FullBlockServiceInfo info;
readBlockServiceInfo(it->key(), it->value(), info);
if (info.flags == BlockServiceFlags::DECOMMISSIONED) {
continue;
}
for (auto& addr : info.addrs.addrs) {
addr.port = 0;
}
StaticValue<WritableBlockServiceKey> writableKey;
StaticValue<LastHeartBeatKey> lastHeartBeat;
bool wasWritable = false;
if (isWritable(info.flags) && info.availableBytes > 0) {
blockServiceToWritableBlockServiceKey(info, writableKey());
batch.Delete(writableBlockServiceCf, writableKey.toSlice());
}
blockServiceToLastHeartBeat(info, lastHeartBeat());
batch.Delete(lastHeartBeatCf, lastHeartBeat.toSlice());
info.flags = info.flags | BlockServiceFlags::STALE;
info.lastInfoChange = now;
writeBlockServiceInfo(batch, blockServiceCf, info);
}
ROCKS_DB_CHECKED(it->status());
delete it;
}
void RegistryDB::processLogEntries(std::vector<LogsDBLogEntry>& logEntries, std::vector<RegistryDBWriteResult>& writeResults) {
auto expectedLogEntry = lastAppliedLogEntry();
std::unordered_map<uint64_t, FullBlockServiceInfo> updatedBlocks;
@@ -552,6 +627,15 @@ void RegistryDB::_initDb() {
ROCKS_DB_CHECKED(_db->Write({}, &batch));
LOG_INFO(_env, "initialized Registry RocksDB");
}
if (_options.usingDynamicPorts) {
LOG_INFO(_env, "wiping dynamic service ports to speed up bootstrap");
rocksdb::WriteBatch batch;
wipeRegistryPorts(batch, _db, _registryCf);
wipeShardPorts(batch, _db, _shardsCf);
wipeCdcPorts(batch, _db, _cdcCf);
wipeBlockServicePorts(batch, _db, _blockServicesCf, _writableBlockServicesCf, _lastHeartBeatCf);
ROCKS_DB_CHECKED(_db->Write({}, &batch));
}
}
void RegistryDB::_recalcualteShardBlockServices(bool writableChanged) {

View File

@@ -61,6 +61,11 @@ static bool parseRegistryOptions(CommandLineArgs& args, RegistryOptions& options
options.writableBlockServiceUpdateInterval = parseDuration(args.next());
continue;
}
if (arg == "-using-dynamic-ports") {
args.next();
options.usingDynamicPorts = true;
continue;
}
fprintf(stderr, "unknown argument %s\n", args.peekArg().c_str());
return false;
}
@@ -93,6 +98,8 @@ static void printRegistryOptionsUsage() {
fprintf(stderr, " Maximum number of block services to assign to a shard for writting at any given time. Default is 28\n");
fprintf(stderr, " -writable-block-service-update-interval\n");
fprintf(stderr, " Maximum interval at which to change writable services assigned to shards. Default 30 min\n");
fprintf(stderr, " -using-dynamic-ports\n");
fprintf(stderr, " Inform registry services are using dynamic ports. Registry will wipe port information on startup to speed up bootstrap\n");
}
static bool validateRegistryOptions(const RegistryOptions& options) {

View File

@@ -432,6 +432,7 @@ type RegistryOpts struct {
Addr1 string
Addr2 string
LogsDBFlags []string
UsingDynamicPorts bool
}
func (procs *ManagedProcesses) StartRegistry(ll *log.Logger, opts *RegistryOpts) {
@@ -461,6 +462,9 @@ func (procs *ManagedProcesses) StartRegistry(ll *log.Logger, opts *RegistryOpts)
if opts.LogsDBFlags != nil {
args = append(args, opts.LogsDBFlags...)
}
if opts.UsingDynamicPorts {
args = append(args, "-using-dynamic-ports")
}
procs.Start(ll, &ManagedProcessArgs{
Name: "registry",
Exe: opts.Exe,

View File

@@ -155,12 +155,6 @@ func main() {
if r == 0 {
dir = path.Join(*dataDir, "registry")
}
if _, err := os.Stat(dir); !os.IsNotExist(err) {
fmt.Printf("Wiping registry replica %d to speed up bootstrap\n", r)
if err := os.RemoveAll(dir); err != nil {
panic(fmt.Errorf("failed to remove directory %s: %v", dir, err))
}
}
opts := managedprocess.RegistryOpts{
Exe: cppExes.RegistryExe,
@@ -170,6 +164,7 @@ func main() {
Replica: msgs.ReplicaId(r),
Xmon: *xmon,
Addr1: "127.0.0.1:0",
UsingDynamicPorts: true,
}
if r == 0 {
if *leaderOnly {

View File

@@ -783,7 +783,7 @@ func killBlockServices(
rand := wyhash.New(uint64(time.Now().UnixNano()))
go func() {
defer func() { lrecover.HandleRecoverChan(log, terminateChan, recover()) }()
lc := net.ListenConfig{
lc := net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) error {
var operr error
err := c.Control(func(fd uintptr) {
@@ -795,7 +795,7 @@ func killBlockServices(
return operr
},
}
var reservedPorts [2]net.Listener
var reservedPorts [2]net.Listener
for {
// pick and kill the victim
pause.Lock()
@@ -815,12 +815,12 @@ func killBlockServices(
procs.Kill(procId, syscall.SIGKILL)
if ports._1 != 0 {
if port1Listener, err := lc.Listen(context.Background(), "tcp4", fmt.Sprintf("127.0.0.1:%d", ports._1)); err == nil {
reservedPorts[0] = port1Listener
reservedPorts[0] = port1Listener
}
}
if ports._2 != 0 {
if port2Listener, err := lc.Listen(context.Background(), "tcp4", fmt.Sprintf("127.0.0.1:%d", ports._2)); err == nil {
reservedPorts[1] = port2Listener
reservedPorts[1] = port2Listener
}
}
break
@@ -1126,11 +1126,12 @@ func main() {
dir = path.Join(*dataDir, "registry")
}
opts := managedprocess.RegistryOpts{
Exe: cppExes.RegistryExe,
LogLevel: level,
Dir: dir,
RegistryAddress: registryAddress,
Replica: msgs.ReplicaId(r),
Exe: cppExes.RegistryExe,
LogLevel: level,
Dir: dir,
RegistryAddress: registryAddress,
Replica: msgs.ReplicaId(r),
UsingDynamicPorts: true,
}
if r == 0 {
if *leaderOnly {