This commit is contained in:
Miroslav Crnic
2025-09-05 13:52:19 +00:00
committed by Miroslav Crnic
parent 74d99c92d2
commit 6edd7bdd6a
23 changed files with 3214 additions and 2542 deletions

View File

@@ -34,16 +34,21 @@ Finally, we want to have the option to replicate TernFS to multiple regions, to
TODO decorate list below with links drilling down on specific concepts.
* **servers**
* **shuckle**
* **web**
* 1 logical instance
* `ternshuckle`, Go binary
* state currently persisted through SQLite (1 physical instance), should move to a Galera cluster soon (see #41)
* TCP -- both bincode and HTTP
* `ternweb`, go binary
* TCP http server
* stateless
* serves web UI
* **registry**
* 1 logical instance
* `ternregistry`, C++ binary
* TCP bincode req/resp
* UDP replication
* stores metadata about a specific TernFS deployment
* shard/cdc addresses
* block services addressea and storage statistics
* latency histograms
* serves web UI
* state persisted through RocksDB with 5-node distributed consensus through LogsDB
* **filesystem data**
* **metadata**
* **shard**
@@ -57,7 +62,7 @@ TODO decorate list below with links drilling down on specific concepts.
* block service to file mapping
* UDP bincode req/resp
* state persisted through RocksDB with 5-node distributed consensus through LogsDB
* communicates with shuckle to fetch block services, register itself, insert statistics
* communicates with registry to fetch block services, register itself, insert statistics
* **CDC**
* 1 logical instance
* `terncdc`, C++ binary
@@ -72,7 +77,7 @@ TODO decorate list below with links drilling down on specific concepts.
* directory -> parent directory mapping to perform "no loops" checks
* state persisted through RocksDB with 5-node distributed consensus through LogsDB
* communicates with the shards to perform the cross-directory actions
* communicates with shuckle to register itself, fetch shards, insert statistics
* communicates with registry to register itself, fetch shards, insert statistics
* **block service**
* up to 1 million logical instances
* 1 logical instance = 1 disk
@@ -84,7 +89,7 @@ TODO decorate list below with links drilling down on specific concepts.
* TCP bincode req/resp
* extremely dumb, the only state is the blobs themselves
* its entire job is efficiently streaming blobs of data from disks into TCP connections
* communicates with shuckle to register itself and to update information about free space, number of blocks, etc.
* communicates with registry to register itself and to update information about free space, number of blocks, etc.
* **clients**, these all talk to all of the servers
* **cli**
* `terncli`, Go binary
@@ -169,7 +174,7 @@ The above will run all the processes needed to run TernFS. This includes:
* 256 metadata shards;
* 1 cross directory coordinator (CDC)
* A bunch of block services (this is tunable with the `-flash-block-services`, `-hdd-block-services`, and `-failure-domains` flags)
* 1 shuckle instance
* 1 registry instance
A multitude of directories to persist the whole thing will appear in `<data-dir>`. The filesystem will also be mounted using FUSE under `<data-dir>/fuse/mnt`.

View File

@@ -19,6 +19,7 @@ ${PWD}/go/build.py
# copy binaries
binaries=(
cpp/build/$build_variant/registry/ternregistry
cpp/build/$build_variant/shard/ternshard
cpp/build/$build_variant/dbtools/terndbtools
cpp/build/$build_variant/cdc/terncdc

View File

@@ -58,6 +58,7 @@ add_subdirectory(rs)
add_subdirectory(crc32c)
add_subdirectory(core)
add_subdirectory(dbtools)
add_subdirectory(registry)
add_subdirectory(shard)
add_subdirectory(cdc)
add_subdirectory(tests)

View File

@@ -0,0 +1,8 @@
include_directories(${ternfs_SOURCE_DIR}/core ${ternfs_SOURCE_DIR}/wyhash)
add_library(registry Registry.hpp Registry.cpp RegistryDB.hpp RegistryDB.cpp
RegistryDBData.hpp Registerer.hpp Registerer.cpp RegistryServer.hpp RegistryServer.cpp)
target_link_libraries(registry PRIVATE core)
add_executable(ternregistry ternregistry.cpp)
target_link_libraries(ternregistry PRIVATE core registry crc32c ${TERNFS_JEMALLOC_LIBS})

105
cpp/registry/Registerer.cpp Normal file
View File

@@ -0,0 +1,105 @@
#include "Registerer.hpp"
#include "LogsDB.hpp"
#include <atomic>
static std::ostream &operator<<(std::ostream &o, const std::vector<FullRegistryInfo> &replicas) {
o << "[";
for (const auto &rep : replicas) {
o << rep << ',';
}
return o << "]";
}
void Registerer::_init(const std::vector<FullRegistryInfo> &cachedReplicas) {
_env.updateAlert(_alert, "Waiting to register ourselves for the first time");
_updateReplicas(cachedReplicas);
}
bool Registerer::periodicStep() {
{
LOG_DEBUG(
_env,
"Registering ourselves (replicaId %s, location %s, %s) with registry",
_logsDBOptions.replicaId, (int)_logsDBOptions.location,
_boundAddresses);
const auto [err, errStr] =
registerRegistry(_clientOptions.host, _clientOptions.port, 10_sec,
_logsDBOptions.replicaId, _logsDBOptions.location,
!_logsDBOptions.avoidBeingLeader, _boundAddresses,
!_hasEnoughReplicas.load(std::memory_order_relaxed));
if (err == EINTR) {
return false;
}
if (err) {
_env.updateAlert(_alert, "Couldn't register ourselves with registry: %s", errStr);
return false;
}
}
{
std::vector<FullRegistryInfo> allReplicas;
LOG_DEBUG(_env, "Fetching replicas from registry");
const auto [err, errStr] = fetchRegistryReplicas(
_clientOptions.host, _clientOptions.port, 10_sec, allReplicas);
if (err == EINTR) {
return false;
}
if (err) {
_env.updateAlert(
_alert, "Failed getting registry replicas from registry: %s", errStr);
return false;
}
LOG_DEBUG(_env, "fetched all replicas %s", allReplicas);
if (!_updateReplicas(allReplicas)) {
return false;
}
}
_env.clearAlert(_alert);
return true;
}
bool Registerer::_updateReplicas(const std::vector<FullRegistryInfo> &allReplicas) {
std::array<AddrsInfo, LogsDB::REPLICA_COUNT> localReplicas;
bool foundLeader = false;
for (auto &replica : allReplicas) {
if (replica.locationId != _logsDBOptions.location) {
continue;
}
localReplicas[replica.id.u8] = replica.addrs;
if (replica.isLeader) {
foundLeader = true;
}
}
if (!foundLeader) {
_env.updateAlert(_alert, "Didn't get leader with known addresses from registry");
return false;
}
if (_boundAddresses != localReplicas[_logsDBOptions.replicaId]) {
_env.updateAlert(_alert, "AddrsInfo in registry: %s , not matching local AddrsInfo: %s",
localReplicas[_logsDBOptions.replicaId], _boundAddresses);
return false;
}
size_t knownReplicas{0};
for (auto &replica : localReplicas) {
if (replica.addrs[0].port != 0) {
++knownReplicas;
}
}
size_t quorumSize = _logsDBOptions.noReplication ? 1 : LogsDB::REPLICA_COUNT / 2 + 1;
if (knownReplicas < quorumSize ) {
_env.updateAlert(_alert, "Didn't get enough replicas with known addresses from registry");
return false;
}
if (unlikely(*_replicas != localReplicas)) {
LOG_DEBUG(_env, "Updating replicas to %s %s %s %s %s", localReplicas[0],
localReplicas[1], localReplicas[2], localReplicas[3],
localReplicas[4]);
std::atomic_exchange(&_replicas, std::make_shared<std::array<AddrsInfo, LogsDB::REPLICA_COUNT>>(localReplicas));
}
return _hasEnoughReplicas.exchange(true, std::memory_order_release);
}

View File

@@ -0,0 +1,48 @@
#pragma once
#include <atomic>
#include "Msgs.hpp"
#include "PeriodicLoop.hpp"
#include "RegistryCommon.hpp"
class Registerer : public PeriodicLoop {
public:
Registerer(Logger &logger, std::shared_ptr<XmonAgent> &xmon,
const RegistryOptions &options, const AddrsInfo &boundAddresses,
const std::vector<FullRegistryInfo>& cachedReplicas)
: PeriodicLoop(logger, xmon, "registerer", {1_sec, 1, 2_mins, 1}),
_logsDBOptions(options.logsDBOptions),
_clientOptions(options.registryClientOptions),
_boundAddresses(boundAddresses),
_hasEnoughReplicas(false),
_replicas(std::make_shared<std::array<AddrsInfo, LogsDB::REPLICA_COUNT>>()) {
_init(cachedReplicas);
}
virtual ~Registerer() = default;
inline bool hasEnoughReplicas() const {
return _hasEnoughReplicas.load(std::memory_order_relaxed);
}
inline std::shared_ptr<std::array<AddrsInfo, LogsDB::REPLICA_COUNT>> replicas() {
return _replicas;
}
virtual bool periodicStep() override;
private:
const LogsDBOptions _logsDBOptions;
const RegistryClientOptions _clientOptions;
const AddrsInfo _boundAddresses;
XmonNCAlert _alert;
std::atomic<bool> _hasEnoughReplicas;
std::shared_ptr<std::array<AddrsInfo, LogsDB::REPLICA_COUNT>> _replicas;
void _init(const std::vector<FullRegistryInfo> &cachedReplicas);
bool _updateReplicas(const std::vector<FullRegistryInfo> &allReplicas);
};

779
cpp/registry/Registry.cpp Normal file
View File

@@ -0,0 +1,779 @@
#include <atomic>
#include <cstring>
#include <fcntl.h>
#include <memory>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
#include <unordered_map>
#include "Assert.hpp"
#include "Bincode.hpp"
#include "Crypto.hpp"
#include "Env.hpp"
#include "ErrorCount.hpp"
#include "LogsDB.hpp"
#include "Loop.hpp"
#include "Msgs.hpp"
#include "MsgsGen.hpp"
#include "Registry.hpp"
#include "Registerer.hpp"
#include "RegistryDB.hpp"
#include "RegistryServer.hpp"
#include "SharedRocksDB.hpp"
#include "Time.hpp"
#include "Timings.hpp"
#include "XmonAgent.hpp"
struct RegistryState {
explicit RegistryState() :
logEntriesQueueSize(0), readingRequests(0), writingRequests(0),
activeConnections(0), requestsInProgress(0)
{
for (RegistryMessageKind kind : allRegistryMessageKind) {
timings[(int)kind] = Timings::Standard();
}
for (auto &x : receivedRequests) {
x = 0;
}
}
std::unique_ptr<RegistryServer> server;
// databases
std::unique_ptr<SharedRocksDB> sharedDB;
std::unique_ptr<LogsDB> logsDB;
std::unique_ptr<RegistryDB> registryDB;
// statistics
std::array<Timings, maxRegistryMessageKind + 1> timings;
std::array<ErrorCount, maxRegistryMessageKind + 1> errors;
std::atomic<double> logEntriesQueueSize;
std::array<std::atomic<double>, 2> receivedRequests; // how many requests we got at once from each socket
std::atomic<double> readingRequests; // how many requests we got from write queue
std::atomic<double> writingRequests; // how many requests we got from read queue
std::atomic<double> activeConnections;
std::atomic<double> requestsInProgress;
};
class RegistryLoop : public Loop {
public:
RegistryLoop(Logger &logger, std::shared_ptr<XmonAgent> xmon, const RegistryOptions& options, Registerer& registerer, RegistryServer& server, LogsDB& logsDB, RegistryDB& registryDB) :
Loop(logger, xmon, "server"),
_options(options),
_registerer(registerer),
_server(server),
_logsDB(logsDB),
_registryDB(registryDB),
_replicas({}),
_replicaFinishedBootstrap({}),
_boostrapFinished(false)
{}
virtual ~RegistryLoop() {}
virtual void step() override {
_server.setReplicas(_registerer.replicas());
if (!_server.receiveMessages(_logsDB.getNextTimeout())){
return;
}
auto now = ternNow();
_logsDB.processIncomingMessages(_server.receivedLogsDBRequests(), _server.receivedLogsDBResponses());
auto& receivedRequests = _server.receivedRegistryRequests();
for (auto &req : receivedRequests) {
if (unlikely(!_boostrapFinished)) {
_processBootstrapRequest(req);
continue;
}
auto& resp = _registryResponses.emplace_back();
resp.requestId = req.requestId;
if (unlikely(!_logsDB.isLeader())) {
LOG_DEBUG(_env, "not leader. dropping request from client %s", req.requestId);
continue;
}
switch (req.req.kind()) {
case RegistryMessageKind::LOCAL_SHARDS: {
auto& registryResp = resp.resp.setLocalShards();
registryResp.shards.els = _shardsAtLocation(_options.logsDBOptions.location);
break;
}
case RegistryMessageKind::LOCAL_CDC: {
auto& registryResp = resp.resp.setLocalCdc();
auto cdcInfo = _cdcAtLocation(_options.logsDBOptions.location);
registryResp.addrs = cdcInfo.addrs;
registryResp.lastSeen = cdcInfo.lastSeen;
break;
}
case RegistryMessageKind::INFO: {
auto& registryResp = resp.resp.setInfo();
auto cachedInfo = _info();
registryResp.capacity = cachedInfo.capacity;
registryResp.available = cachedInfo.available;
registryResp.blocks = cachedInfo.blocks;
registryResp.numBlockServices = cachedInfo.numBlockServices;
registryResp.numFailureDomains = cachedInfo.numFailureDomains;
break;
}
case RegistryMessageKind::REGISTRY: {
auto &registryResp = resp.resp.setRegistry();
registryResp.addrs = _server.boundAddresses();
break;
}
case RegistryMessageKind::LOCATIONS: {
auto& registryResp = resp.resp.setLocations();
_registryDB.locations(registryResp.locations.els);
break;
}
case RegistryMessageKind::LOCAL_CHANGED_BLOCK_SERVICES: {
auto& registryResp = resp.resp.setLocalChangedBlockServices();
auto& changedReq = req.req.getLocalChangedBlockServices();
registryResp.blockServices.els = _changedBlockServices(_options.logsDBOptions.location, changedReq.changedSince);
break;
}
case RegistryMessageKind::CHANGED_BLOCK_SERVICES_AT_LOCATION: {
auto& registryResp = resp.resp.setChangedBlockServicesAtLocation();
auto& changedReq = req.req.getChangedBlockServicesAtLocation();
registryResp.blockServices.els = _changedBlockServices(changedReq.locationId, changedReq.changedSince);
break;
}
case RegistryMessageKind::SHARDS_AT_LOCATION: {
auto& registryResp = resp.resp.setShardsAtLocation();
registryResp.shards.els = _shardsAtLocation(req.req.getShardsAtLocation().locationId);
break;
}
case RegistryMessageKind::CDC_AT_LOCATION: {
auto& registryResp = resp.resp.setCdcAtLocation();
auto cdcInfo = _cdcAtLocation(req.req.getCdcAtLocation().locationId);
registryResp.addrs = cdcInfo.addrs;
registryResp.lastSeen = cdcInfo.lastSeen;
break;
}
case RegistryMessageKind::ALL_REGISTRY_REPLICAS: {
auto &allRegiResp = resp.resp.setAllRegistryReplicas();
allRegiResp.replicas.els = _registries();
break;
}
case RegistryMessageKind::SHARD_BLOCK_SERVICES_DE_PR_EC_AT_ED: {
auto& registryResp = resp.resp.setShardBlockServicesDEPRECATED();
auto& bsReq = req.req.getShardBlockServicesDEPRECATED();
std::vector<BlockServiceInfoShort> blockservices;
_registryDB.shardBlockServices(bsReq.shardId, blockservices);
for (auto& bs : blockservices) {
registryResp.blockServices.els.push_back(bs.id);
}
break;
}
case RegistryMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED: {
auto& registryResp = resp.resp.setCdcReplicasDEPRECATED();
registryResp.replicas.els = _cdcReplicas();
break;
}
case RegistryMessageKind::ALL_SHARDS: {
auto& registryResp = resp.resp.setAllShards();
_populateShardCache();
registryResp.shards.els = _cachedShardInfo;
break;
}
case RegistryMessageKind::SHARD_BLOCK_SERVICES: {
auto& registryResp = resp.resp.setShardBlockServices();
auto& bsReq = req.req.getShardBlockServices();
_registryDB.shardBlockServices(bsReq.shardId, registryResp.blockServices.els);
break;
}
case RegistryMessageKind::ALL_CDC: {
auto& registryResp = resp.resp.setAllCdc();
_populateCdcCache();
registryResp.replicas.els = _cachedCdc;
break;
}
case RegistryMessageKind::ERASE_DECOMMISSIONED_BLOCK: {
auto& eraseReq = req.req.getEraseDecommissionedBlock();
BincodeFixedBytes<8> proof;
if (_eraseBlock(eraseReq, proof)) {
auto& registryResp = resp.resp.setEraseDecommissionedBlock();
registryResp.proof = proof;
} else {
resp.resp.setError() = TernError::INTERNAL_ERROR;
}
break;
}
case RegistryMessageKind::ALL_BLOCK_SERVICES_DEPRECATED: {
auto &registryResp = resp.resp.setAllBlockServicesDeprecated();
registryResp.blockServices.els = _allBlockServices();
break;
}
case RegistryMessageKind::REGISTER_BLOCK_SERVICES: {
if (_logEntries.size() >= LogsDB::IN_FLIGHT_APPEND_WINDOW) {
break;
}
bool failed = true;
//happy path. request fits in udp package and we don't want to copy
if (sizeof(now) + req.req.packedSize() <= LogsDB::DEFAULT_UDP_ENTRY_SIZE) {
try {
auto &entry = _logEntries.emplace_back();
entry.value.resize(sizeof(now) + req.req.packedSize());
ALWAYS_ASSERT(entry.value.size() <= LogsDB::DEFAULT_UDP_ENTRY_SIZE);
BincodeBuf buf((char *)(&entry.value[0]), entry.value.size());
buf.packScalar(now.ns);
req.req.pack(buf);
buf.ensureFinished();
_entriesRequestIds.emplace_back(req.requestId);
failed = false;
} catch (BincodeException e) {
LOG_ERROR(_env,"failed packing log entry request %s", e.what());
_logEntries.pop_back();
}
} else {
std::vector<RegisterBlockServiceInfo> blockservices;
blockservices = req.req.getRegisterBlockServices().blockServices.els;
RegistryReqContainer tmpReqContainer;
auto& tmpBsReq = tmpReqContainer.setRegisterBlockServices();
while (!blockservices.empty()) {
size_t currentSize = sizeof(now) + RegistryReqContainer::STATIC_SIZE;
size_t toCopy = 0;
for (auto bsIt = blockservices.rbegin(); bsIt != blockservices.rend(); ++bsIt) {
currentSize += bsIt->packedSize();
if (currentSize > LogsDB::DEFAULT_UDP_ENTRY_SIZE) {
break;
}
++toCopy;
}
tmpBsReq.blockServices.els.clear();
tmpBsReq.blockServices.els.insert(
tmpBsReq.blockServices.els.end(),blockservices.end() - toCopy,blockservices.end());
blockservices.resize(blockservices.size() - toCopy);
try {
auto &entry = _logEntries.emplace_back();
entry.value.resize(sizeof(now) + tmpReqContainer.packedSize());
ALWAYS_ASSERT(entry.value.size() <= LogsDB::DEFAULT_UDP_ENTRY_SIZE);
BincodeBuf buf((char *)(&entry.value[0]), entry.value.size());
buf.packScalar(now.ns);
tmpReqContainer.pack(buf);
buf.ensureFinished();
_entriesRequestIds.emplace_back(req.requestId);
failed = false;
} catch (BincodeException e) {
LOG_ERROR(_env,"failed packing log entry request %s", e.what());
_logEntries.pop_back();
break;
}
}
}
if (!failed) {
// we are not sending response yet. we can't leave empty response otherwise
// server will drop connection
_registryResponses.pop_back();
} else {
LOG_ERROR(_env,"FAILED REGISTERING");
}
break;
}
case RegistryMessageKind::DECOMMISSION_BLOCK_SERVICE:
case RegistryMessageKind::CREATE_LOCATION:
case RegistryMessageKind::RENAME_LOCATION:
case RegistryMessageKind::REGISTER_SHARD:
case RegistryMessageKind::REGISTER_REGISTRY:
case RegistryMessageKind::REGISTER_CDC:
case RegistryMessageKind::SET_BLOCK_SERVICE_FLAGS:
case RegistryMessageKind::MOVE_SHARD_LEADER:
case RegistryMessageKind::CLEAR_SHARD_INFO:
case RegistryMessageKind::MOVE_CDC_LEADER:
case RegistryMessageKind::CLEAR_CDC_INFO:
case RegistryMessageKind::UPDATE_BLOCK_SERVICE_PATH: {
if (_logEntries.size() >= LogsDB::IN_FLIGHT_APPEND_WINDOW) {
break;
}
try {
auto &entry = _logEntries.emplace_back();
entry.value.resize(sizeof(now) + req.req.packedSize());
ALWAYS_ASSERT(entry.value.size() <= LogsDB::MAX_UDP_ENTRY_SIZE);
BincodeBuf buf((char *)(&entry.value[0]), entry.value.size());
buf.packScalar(now.ns);
req.req.pack(buf);
buf.ensureFinished();
_entriesRequestIds.emplace_back(req.requestId);
// we are not sending response yet. we can't leave empty response otherwise
// server will drop connection
_registryResponses.pop_back();
} catch (BincodeException e) {
LOG_ERROR(_env,"failed packing log entry request %s", e.what());
_logEntries.pop_back();
}
break;
}
case RegistryMessageKind::EMPTY:
case RegistryMessageKind::ERROR:
break;
}
}
receivedRequests.clear();
if (_logsDB.isLeader()) {
_logsDB.appendEntries(_logEntries);
for (int i = 0; i < _logEntries.size(); ++i) {
auto &entry = _logEntries[i];
auto requestId = _entriesRequestIds[i];
if (entry.idx == 0) {
LOG_DEBUG(_env, "could not append entry for request %s", requestId);
// empty response drops request
_registryResponses.emplace_back().requestId = requestId;
continue;
}
_logIdxToRequestId.emplace(entry.idx.u64, requestId);
}
_logEntries.clear();
_entriesRequestIds.clear();
}
ALWAYS_ASSERT(_logEntries.empty());
ALWAYS_ASSERT(_entriesRequestIds.empty());
do {
_logEntries.clear();
_logsDB.readEntries(_logEntries);
_registryDB.processLogEntries(_logEntries, _writeResults);
} while (!_logEntries.empty());
_logsDB.flush(true);
_logsDB.getOutgoingMessages(_logsDBOutRequests, _logsDBOutResponses);
LOG_TRACE(_env, "Sending %s log requests and %s log responses", _logsDBOutRequests.size(), _logsDBOutResponses.size());
_server.sendLogsDBMessages(_logsDBOutRequests, _logsDBOutResponses);
for (auto &writeResult : _writeResults) {
auto& regiResp = _registryResponses.emplace_back();
auto requestIdIt = _logIdxToRequestId.find(writeResult.idx.u64);
if (requestIdIt == _logIdxToRequestId.end()) {
_registryResponses.pop_back();
continue;
}
regiResp.requestId = requestIdIt->second;
RegistryRespContainer& resp = regiResp.resp;
if (writeResult.err != TernError::NO_ERROR) {
resp.setError() = writeResult.err;
continue;
}
switch (writeResult.kind) {
case RegistryMessageKind::ERROR:
resp.setError() = writeResult.err;
break;
case RegistryMessageKind::CREATE_LOCATION:
resp.setCreateLocation();
break;
case RegistryMessageKind::RENAME_LOCATION:
resp.setRenameLocation();
break;
case RegistryMessageKind::REGISTER_SHARD:
resp.setRegisterShard();
break;
case RegistryMessageKind::REGISTER_CDC:
resp.setRegisterCdc();
break;
case RegistryMessageKind::SET_BLOCK_SERVICE_FLAGS:
resp.setSetBlockServiceFlags();
break;
case RegistryMessageKind::REGISTER_BLOCK_SERVICES:
resp.setRegisterBlockServices();
break;
case RegistryMessageKind::REGISTER_REGISTRY:
resp.setRegisterRegistry();
break;
case RegistryMessageKind::DECOMMISSION_BLOCK_SERVICE:
resp.setDecommissionBlockService();
break;
case RegistryMessageKind::MOVE_SHARD_LEADER:
resp.setMoveShardLeader();
break;
case RegistryMessageKind::CLEAR_SHARD_INFO:
resp.setClearShardInfo();
break;
case RegistryMessageKind::MOVE_CDC_LEADER:
resp.setMoveCdcLeader();
break;
case RegistryMessageKind::CLEAR_CDC_INFO:
resp.setClearCdcInfo();
break;
case RegistryMessageKind::UPDATE_BLOCK_SERVICE_PATH:
resp.setUpdateBlockServicePath();
break;
default:
ALWAYS_ASSERT(false);
break;
}
}
_writeResults.clear();
_server.sendRegistryResponses(_registryResponses);
_registryResponses.clear();
_clearCaches();
}
private:
const RegistryOptions& _options;
Registerer& _registerer;
RegistryServer& _server;
LogsDB& _logsDB;
RegistryDB& _registryDB;
std::array<AddrsInfo, LogsDB::REPLICA_COUNT> _replicas;
std::array<bool, LogsDB::REPLICA_COUNT> _replicaFinishedBootstrap;
bool _boostrapFinished;
std::vector<RegistryResponse> _registryResponses;
// buffer for reading/writing log entries
std::vector<LogsDBLogEntry> _logEntries;
std::vector<uint64_t> _entriesRequestIds;
std::vector<LogsDBRequest*> _logsDBOutRequests;
std::vector<LogsDBResponse> _logsDBOutResponses;
// keeping track which log entry corresponds to which request
std::unordered_map<uint64_t, uint64_t> _logIdxToRequestId;
// buffer for RegistryDB processLogEntries result
std::vector<RegistryDBWriteResult> _writeResults;
// local cache for common data, read once per step
std::vector<FullRegistryInfo> _cachedRegistries;
std::vector<FullShardInfo> _cachedShardInfo;
std::unordered_map<LocationId, std::vector<ShardInfo>> _cachedShardInfoPerLocation;
std::vector<FullBlockServiceInfo> _cachedBlockServices;
std::vector<BlockServiceDeprecatedInfo> _cachedAllBlockServices;
std::unordered_map<BlockServiceId, AES128Key> _decommissionedServices;
std::vector<CdcInfo> _cachedCdc;
InfoResp _cachedInfo;
void _clearCaches() {
_cachedRegistries.clear();
_cachedShardInfo.clear();
for(auto& loc : _cachedShardInfoPerLocation) {
loc.second.clear();
loc.second.resize(256);
}
_cachedBlockServices.clear();
_cachedAllBlockServices.clear();
_cachedInfo.clear();
_cachedCdc.clear();
}
void _processBootstrapRequest(RegistryRequest &req) {
auto& resp = _registryResponses.emplace_back();
resp.requestId = req.requestId;
switch (req.req.kind()) {
case RegistryMessageKind::REGISTER_REGISTRY: {
const auto &regiReq = req.req.getRegisterRegistry();
LOG_TRACE(_env, "Received register request in bootstrap %s", regiReq);
if (regiReq.location != _options.logsDBOptions.location) {
LOG_TRACE(_env, "Dropping register request in bootstrap for other location");
break;
}
_replicas[regiReq.replicaId.u8] = regiReq.addrs;
_replicaFinishedBootstrap[regiReq.replicaId.u8] = !regiReq.bootstrap;
resp.resp.setRegisterRegistry();
size_t quorumSize = _replicaFinishedBootstrap.size() / 2 + 1;
if (_options.logsDBOptions.noReplication) {
quorumSize = 1;
}
_boostrapFinished = std::count(
std::begin(_replicaFinishedBootstrap), std::end(_replicaFinishedBootstrap), true) >= quorumSize;
if (_boostrapFinished) {
LOG_TRACE(_env, "Bootstrap finished!");
_server.setReplicas(_registerer.replicas());
}
break;
}
case RegistryMessageKind::ALL_REGISTRY_REPLICAS: {
auto &allRegiResp = resp.resp.setAllRegistryReplicas();
for (uint8_t i = 0; i < _replicas.size(); ++i) {
auto &addrs = _replicas[i];
if (addrs.addrs[0].ip.data[0] == 0) {
continue;
}
auto &replica = allRegiResp.replicas.els.emplace_back();
replica.id = i;
replica.locationId = _options.logsDBOptions.location;
replica.addrs = addrs;
replica.isLeader = !_options.logsDBOptions.avoidBeingLeader;
replica.lastSeen = ternNow();
}
break;
}
default:
// empty response removes client
break;
}
}
void _populateShardCache() {
if (!_cachedShardInfo.empty()) {
return;
}
_registryDB.shards(_cachedShardInfo);
for(auto& shard : _cachedShardInfo) {
if (!shard.isLeader) {
continue;
}
auto& locShards = _cachedShardInfoPerLocation[shard.locationId];
if (locShards.empty()) {
locShards.resize(256);
}
auto& infoShort = locShards[shard.id.shardId().u8];
infoShort.addrs = shard.addrs;
infoShort.lastSeen = shard.lastSeen;
}
if (_cachedShardInfoPerLocation.empty()) {
_cachedShardInfoPerLocation[_options.logsDBOptions.location].resize(256);
}
}
const std::vector<FullRegistryInfo>& _registries() {
if (_cachedRegistries.empty()) {
_registryDB.registries(_cachedRegistries);
}
return _cachedRegistries;
}
const std::vector<ShardInfo>& _shardsAtLocation(LocationId location) {
_populateShardCache();
return _cachedShardInfoPerLocation[location];
}
void _populateCdcCache() {
if (!_cachedCdc.empty()) {
return;
}
_registryDB.cdcs(_cachedCdc);
}
CdcInfo _cdcAtLocation(LocationId location) {
_populateCdcCache();
for(const auto& cdc : _cachedCdc) {
if (cdc.isLeader && cdc.locationId == location) {
return cdc;
}
}
return CdcInfo{};
}
std::vector<AddrsInfo> _cdcReplicas() {
_populateCdcCache();
std::vector<AddrsInfo> res;
res.resize(LogsDB::REPLICA_COUNT);
for(const auto& cdc : _cachedCdc) {
if (cdc.locationId != 0) {
continue;
}
res[cdc.replicaId.u8] = cdc.addrs;
}
return res;
}
void _populateBlockServiceCache() {
if (!_cachedBlockServices.empty()) {
return;
}
_registryDB.blockServices(_cachedBlockServices);
std::sort(_cachedBlockServices.begin(), _cachedBlockServices.end(),
[](const FullBlockServiceInfo& a, const FullBlockServiceInfo& b) {
return a.lastInfoChange > b.lastInfoChange;
});
for (auto& bs : _cachedBlockServices) {
auto& infoDeprecated = _cachedAllBlockServices.emplace_back();
infoDeprecated.id = bs.id;
infoDeprecated.addrs = bs.addrs;
infoDeprecated.storageClass = bs.storageClass;
infoDeprecated.failureDomain = bs.failureDomain;
infoDeprecated.secretKey = bs.secretKey;
infoDeprecated.flags = bs.flags;
infoDeprecated.capacityBytes = bs.capacityBytes;
infoDeprecated.availableBytes = bs.availableBytes;
infoDeprecated.blocks = bs.blocks;
infoDeprecated.path = bs.path;
infoDeprecated.lastSeen = bs.lastSeen;
infoDeprecated.hasFiles = bs.hasFiles;
infoDeprecated.flagsLastChanged = bs.lastInfoChange;
if (bs.flags == BlockServiceFlags::DECOMMISSIONED && !_decommissionedServices.contains(bs.id)) {
auto it = _decommissionedServices.emplace(bs.id, AES128Key{}).first;
expandKey(bs.secretKey.data, it->second);
}
if (bs.flags != BlockServiceFlags::DECOMMISSIONED) {
++_cachedInfo.numBlockServices;
_cachedInfo.blocks += bs.blocks;
_cachedInfo.available += bs.availableBytes;
_cachedInfo.capacity += bs.capacityBytes;
}
}
}
InfoResp _info() {
_populateBlockServiceCache();
return _cachedInfo;
}
std::vector<BlockServiceDeprecatedInfo> _allBlockServices() {
_populateBlockServiceCache();
return _cachedAllBlockServices;
}
std::vector<BlockService> _changedBlockServices(LocationId location, TernTime changedSince) {
_populateBlockServiceCache();
std::vector<BlockService> res;
for (auto& bs : _cachedBlockServices) {
if (bs.lastInfoChange < changedSince) {
break;
}
if (bs.locationId != location) {
continue;
}
auto& bsOut = res.emplace_back();
bsOut.id = bs.id;
bsOut.addrs = bs.addrs;
bsOut.flags = bsOut.flags;
}
return res;
}
bool _eraseBlock(const EraseDecommissionedBlockReq& req, BincodeFixedBytes<8>& proof) {
_populateBlockServiceCache();
auto bsIt = _decommissionedServices.find(req.blockServiceId);
if (bsIt == _decommissionedServices.end()) {
return false;
}
// validate certificate
{
char buf[32];
memset(buf, 0, sizeof(buf));
BincodeBuf bbuf(buf, sizeof(buf));
// struct.pack_into('<QcQ', b, 0, block['block_service_id'], b'e', block['block_id'])
bbuf.packScalar<uint64_t>(req.blockServiceId.u64);
bbuf.packScalar<char>('e');
bbuf.packScalar<uint64_t>(req.blockId);
if (cbcmac(bsIt->second, (uint8_t*)buf, sizeof(buf)) != req.certificate) {
return false;
}
}
// generate proof
{
char buf[32];
memset(buf, 0, sizeof(buf));
BincodeBuf bbuf(buf, sizeof(buf));
// struct.pack_into('<QcQ', b, 0, block['block_service_id'], b'E', block['block_id'])
bbuf.packScalar<uint64_t>(req.blockServiceId.u64);
bbuf.packScalar<char>('E');
bbuf.packScalar<uint64_t>(req.blockId);
proof.data = cbcmac(bsIt->second, (uint8_t*)buf, sizeof(buf));
}
return true;
}
};
Registry::Registry(Logger &logger, std::shared_ptr<XmonAgent> xmon)
: _env(logger, xmon, "registry") {
};
Registry::~Registry() {}
void Registry::start(const RegistryOptions& options, LoopThreads& threads) {
{
LOG_INFO(_env, "Running registry %s at location %s, base directory %s:", (int)options.logsDBOptions.replicaId, (int)options.logsDBOptions.location, options.logsDBOptions.dbDir);
LOG_INFO(_env, " Logging options:");
LOG_INFO(_env, " logFile = '%s'", options.logOptions.logFile);
LOG_INFO(_env, " xmon = %s", options.xmonOptions.addr);
LOG_INFO(_env, " metrics = '%s'", (int)!options.metricsOptions.origin.empty());
LOG_INFO(_env, " LogsDB options:");
LOG_INFO(_env, " noReplication = '%s'", (int)options.logsDBOptions.noReplication);
LOG_INFO(_env, " avoidBeingLeader = '%s'", (int)options.logsDBOptions.avoidBeingLeader);
LOG_INFO(_env, " Registry options:");
LOG_INFO(_env, " registryHost = '%s'", options.registryClientOptions.host);
LOG_INFO(_env, " registryPort = %s", options.registryClientOptions.port);
LOG_INFO(_env, " ownAddres = %s", options.serverOptions.addrs);
LOG_INFO(_env, " enforceStableIp = '%s'", (int)options.enforceStableIp);
LOG_INFO(_env, " enforceStableLeader = '%s'", (int)options.enforceStableLeader);
LOG_INFO(_env, " maxConnections = '%s'", options.maxConnections);
LOG_INFO(_env, " minAutoDecomInterval = '%s'", options.minDecomInterval);
LOG_INFO(_env, " alertAtUnavailableFailureDomains = '%s'", (int)options.alertAfterUnavailableFailureDomains);
LOG_INFO(_env, " stalenessDelay = '%s'", options.staleDelay);
LOG_INFO(_env, " blockServiceUseDelay = '%s'", options.blockServiceUsageDelay);
LOG_INFO(_env, " maxWritableBlockServicePerShard = '%s'", options.maxFailureDomainsPerShard);
LOG_INFO(_env, " writableBlockServiceUpdateInterval = '%s'", options.writableBlockServiceUpdateInterval);
}
_state = std::make_unique<RegistryState>();
std::string dbDir = options.logsDBOptions.dbDir;
XmonNCAlert dbInitAlert{1_mins};
_env.updateAlert(dbInitAlert, "initializing database %s", options.xmonOptions.appInstance);
auto xmon = _env.xmon();
auto& logger = _env.logger();
_state->sharedDB = std::make_unique<SharedRocksDB>(logger, xmon, dbDir + "/db", dbDir + "/db-statistics.txt");
_state->sharedDB->registerCFDescriptors(LogsDB::getColumnFamilyDescriptors());
_state->sharedDB->registerCFDescriptors(RegistryDB::getColumnFamilyDescriptors());
rocksdb::Options rocksDBOptions;
rocksDBOptions.create_if_missing = true;
rocksDBOptions.create_missing_column_families = true;
rocksDBOptions.compression = rocksdb::kLZ4Compression;
rocksDBOptions.bottommost_compression = rocksdb::kZSTD;
rocksDBOptions.max_open_files = 1000;
// We batch writes and flush manually.
rocksDBOptions.manual_wal_flush = true;
_state->sharedDB->open(rocksDBOptions);
_state->registryDB = std::make_unique<RegistryDB>(logger, xmon, options, *_state->sharedDB);
_state->logsDB = std::make_unique<LogsDB>(
logger, xmon, *_state->sharedDB, options.logsDBOptions.replicaId,
_state->registryDB->lastAppliedLogEntry(), options.logsDBOptions.noReplication,
options.logsDBOptions.avoidBeingLeader);
_env.clearAlert(dbInitAlert);
_state->server.reset(new RegistryServer(options, _env));
ALWAYS_ASSERT(_state->server->init());
std::vector<FullRegistryInfo> cachedRegistries;
_state->registryDB->registries(cachedRegistries);
auto registerer = std::make_unique<Registerer>(logger, xmon, options, _state->server->boundAddresses(), cachedRegistries);
auto registryLoop = std::make_unique<RegistryLoop>(logger, xmon, options, *registerer, *_state->server, *_state->logsDB, *_state->registryDB);
threads.emplace_back(LoopThread::Spawn(std::move((registerer))));
threads.emplace_back(LoopThread::Spawn(std::move((registryLoop))));
}
void Registry::close() {
if (_state) {
if (_state->registryDB) {
_state->registryDB->close();
_state->registryDB.reset();
}
if (_state->logsDB) {
_state->logsDB->close();
_state->logsDB.reset();
}
if (_state->sharedDB) {
_state->sharedDB->close();
_state->sharedDB.reset();
}
LOG_INFO(_env, "registry terminating gracefully, bye.");
}
_state.reset();
}

21
cpp/registry/Registry.hpp Normal file
View File

@@ -0,0 +1,21 @@
#pragma once
#include <memory>
#include "Env.hpp"
#include "Loop.hpp"
#include "RegistryCommon.hpp"
class Registry {
public:
explicit Registry(Logger& logger, std::shared_ptr<XmonAgent> xmon);
~Registry();
void start(const RegistryOptions& options, LoopThreads& threads);
void close();
private:
Env _env;
std::unique_ptr<RegistryState> _state;
};

View File

@@ -0,0 +1,28 @@
#pragma once
#include <cstdint>
#include "CommonOptions.hpp"
#include "Time.hpp"
struct RegistryOptions {
LogOptions logOptions;
XmonOptions xmonOptions;
MetricsOptions metricsOptions;
RegistryClientOptions registryClientOptions;
LogsDBOptions logsDBOptions;
ServerOptions serverOptions;
// Registry specific settings
bool enforceStableIp = false;
bool enforceStableLeader = false;
uint32_t maxConnections = 4000;
Duration staleDelay = 3_mins;
Duration blockServiceUsageDelay = 0_mins;
Duration minDecomInterval = 1_hours;
uint8_t alertAfterUnavailableFailureDomains = 3;
uint32_t maxFailureDomainsPerShard = 28;
Duration writableBlockServiceUpdateInterval = 30_mins;
};
struct RegistryState;

684
cpp/registry/RegistryDB.cpp Normal file
View File

@@ -0,0 +1,684 @@
#include "RegistryDB.hpp"
#include "Assert.hpp"
#include "Bincode.hpp"
#include "Env.hpp"
#include "LogsDB.hpp"
#include "Msgs.hpp"
#include "MsgsGen.hpp"
#include "RegistryDBData.hpp"
#include "RocksDBUtils.hpp"
#include "Time.hpp"
#include <rocksdb/write_batch.h>
static constexpr auto REGISTRY_CF_NAME = "registry";
static constexpr auto LOCATIONS_CF_NAME = "locations";
static constexpr auto SHARDS_CF_NAME = "shards";
static constexpr auto CDC_CF_NAME = "cdc";
static constexpr auto BLOCKSERVICES_CF_NAME = "blockservices";
static constexpr auto LAST_SERVICE_HEARTBEAT_CF_NAME = "last_heartbeat";
static constexpr auto WRITABLE_BLOCKSERVICES_CF_NAME = "writable_blockservices";
std::vector<rocksdb::ColumnFamilyDescriptor>
RegistryDB::getColumnFamilyDescriptors() {
return {
{rocksdb::kDefaultColumnFamilyName, {}},
{REGISTRY_CF_NAME, {}},
{LOCATIONS_CF_NAME, {}},
{SHARDS_CF_NAME, {}},
{CDC_CF_NAME, {}},
{BLOCKSERVICES_CF_NAME, {}},
{LAST_SERVICE_HEARTBEAT_CF_NAME, {}},
{WRITABLE_BLOCKSERVICES_CF_NAME, {}},
};
}
RegistryDB::RegistryDB(Logger &logger, std::shared_ptr<XmonAgent> &xmon, const RegistryOptions& options, const SharedRocksDB &sharedDB) :
_options(options), _env(logger, xmon, "RegistryDB"),
_lastCalculatedShardBlockServices(0),
_db(sharedDB.db()),
_defaultCf(sharedDB.getCF(rocksdb::kDefaultColumnFamilyName)),
_registryCf(sharedDB.getCF(REGISTRY_CF_NAME)),
_locationsCf(sharedDB.getCF(LOCATIONS_CF_NAME)),
_shardsCf(sharedDB.getCF(SHARDS_CF_NAME)),
_cdcCf(sharedDB.getCF(CDC_CF_NAME)),
_blockServicesCf(sharedDB.getCF(BLOCKSERVICES_CF_NAME)),
_lastHeartBeatCf(sharedDB.getCF(LAST_SERVICE_HEARTBEAT_CF_NAME)),
_writableBlockServicesCf(sharedDB.getCF(WRITABLE_BLOCKSERVICES_CF_NAME))
{
LOG_INFO(_env, "opening Registry RocksDB");
_initDb();
}
void RegistryDB::close() {
LOG_INFO(_env, "closing RegistryDB, lastAppliedLogEntry(%s)", lastAppliedLogEntry());
}
struct ReloadState {
bool locations;
bool registry;
bool shards;
bool cdc;
bool blockServices;
ReloadState() : locations(false), registry(false), shards(false), cdc(false), blockServices(false) {}
};
static bool addressesIntersect(const AddrsInfo& currentAddr, const AddrsInfo& newAddr) {
if (currentAddr.addrs[0].ip.data[0] == 0 && currentAddr.addrs[1].ip.data[0] == 0) {
// we allow address to be set if current is empty
return true;
}
for (auto& cur : currentAddr.addrs) {
if (cur.ip.data[0] == 0) {
continue;
}
for(auto& n : newAddr.addrs) {
if (cur.ip == n.ip) {
return true;
}
}
}
return false;
}
void RegistryDB::processLogEntries(std::vector<LogsDBLogEntry>& logEntries, std::vector<RegistryDBWriteResult>& writeResults) {
auto expectedLogEntry = lastAppliedLogEntry();
std::unordered_map<uint64_t, FullBlockServiceInfo> updatedBlocks;
bool writableChanged = false;
rocksdb::WriteBatch writeBatch;
ReloadState toReload{};
TernTime lastRequestTime{};
for (auto &entry : logEntries) {
ALWAYS_ASSERT(entry.idx == ++expectedLogEntry, "log entry index mismatch");
BincodeBuf buf((char*)entry.value.data(), entry.value.size());
TernTime requestTime = buf.unpackScalar<uint64_t>();
lastRequestTime = std::max(lastRequestTime, requestTime);
RegistryReqContainer reqContainer;
reqContainer.unpack(buf);
buf.ensureFinished();
auto& res = writeResults.emplace_back();
res.idx = entry.idx;
res.kind = reqContainer.kind();
res.err = TernError::NO_ERROR;
switch (res.kind) {
case RegistryMessageKind::CREATE_LOCATION: {
auto& req = reqContainer.getCreateLocation();
StaticValue<ShardInfoKey> key;
std::string value;
key().setLocationId(req.id);
auto status = _db->Get({}, _locationsCf, key.toSlice(), &value);
if (status != rocksdb::Status::NotFound()) {
ROCKS_DB_CHECKED(status);
res.err = TernError::LOCATION_EXISTS;
break;
}
LocationInfo newLocation;
newLocation.id = req.id;
newLocation.name = req.name;
writeLocationInfo(writeBatch, _locationsCf, newLocation);
toReload.locations = toReload.registry = toReload.shards = toReload.cdc = true;
break;
}
case RegistryMessageKind::RENAME_LOCATION: {
auto& req = reqContainer.getRenameLocation();
StaticValue<ShardInfoKey> key;
std::string value;
key().setLocationId(req.id);
auto status = _db->Get({}, _locationsCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
LOG_ERROR(_env, "unknown location in req %s", req);
res.err = TernError::LOCATION_NOT_FOUND;
break;
}
ROCKS_DB_CHECKED(status);
LocationInfo newLocation;
newLocation.id = req.id;
newLocation.name = req.name;
writeLocationInfo(writeBatch, _locationsCf, newLocation);
toReload.locations = true;
break;
}
case RegistryMessageKind::REGISTER_SHARD: {
auto& req = reqContainer.getRegisterShard();
StaticValue<ShardInfoKey> key;
std::string value;
key().setLocationId(req.location);
key().setReplicaId(req.shrid.replicaId());
key().setShardId(req.shrid.shardId().u8);
auto status = _db->Get({}, _shardsCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
LOG_ERROR(_env, "unknown shard replica in req %s", req);
res.err = TernError::INVALID_REPLICA;
break;
}
ROCKS_DB_CHECKED(status);
FullShardInfo info;
readShardInfo(key.toSlice(), value, info);
if (_options.enforceStableIp && (!addressesIntersect(info.addrs, req.addrs))) {
res.err = TernError::DIFFERENT_ADDRS_INFO;
break;
}
if (_options.enforceStableLeader && info.isLeader != req.isLeader) {
res.err = TernError::LEADER_PREEMPTED;
break;
}
StaticValue<LastHeartBeatKey> lastHeartBeat;
shardToLastHeartBeat(info, lastHeartBeat());
writeBatch.Delete(_lastHeartBeatCf, lastHeartBeat.toSlice());
info.isLeader = req.isLeader;
info.addrs = req.addrs;
info.lastSeen = requestTime;
shardToLastHeartBeat(info, lastHeartBeat());
writeBatch.Put(_lastHeartBeatCf, lastHeartBeat.toSlice(),{});
writeShardInfo(writeBatch, _shardsCf, info);
break;
}
case RegistryMessageKind::REGISTER_CDC: {
auto& req = reqContainer.getRegisterCdc();
StaticValue<CdcInfoKey> key;
std::string value;
key().setLocationId(req.location);
key().setReplicaId(req.replica);
auto status = _db->Get({}, _cdcCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
LOG_ERROR(_env, "unknown cdc replica in req %s", req);
res.err = TernError::INVALID_REPLICA;
break;
}
ROCKS_DB_CHECKED(status);
CdcInfo info;
readCdcInfo(key.toSlice(), value, info);
if (_options.enforceStableIp && (!addressesIntersect(info.addrs, req.addrs))) {
res.err = TernError::DIFFERENT_ADDRS_INFO;
break;
}
if (_options.enforceStableLeader && info.isLeader != req.isLeader) {
res.err = TernError::LEADER_PREEMPTED;
break;
}
StaticValue<LastHeartBeatKey> lastHeartBeat;
cdcToLastHeartBeat(info, lastHeartBeat());
writeBatch.Delete(_lastHeartBeatCf, lastHeartBeat.toSlice());
info.isLeader = req.isLeader;
info.addrs = req.addrs;
info.lastSeen = requestTime;
cdcToLastHeartBeat(info, lastHeartBeat());
writeBatch.Put(_lastHeartBeatCf, lastHeartBeat.toSlice(),{});
writeCdcInfo(writeBatch, _cdcCf, info);
break;
}
case RegistryMessageKind::SET_BLOCK_SERVICE_FLAGS: {
auto& req = reqContainer.getSetBlockServiceFlags();
auto bsIt = updatedBlocks.find(req.id.u64);
FullBlockServiceInfo info;
StaticValue<BlockServiceInfoKey> key;
key().setId(req.id.u64);
if (bsIt == updatedBlocks.end()) {
std::string value;
auto status = _db->Get({}, _blockServicesCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
LOG_ERROR(_env, "unknown block service in req %s", req);
res.err = TernError::BLOCK_SERVICE_NOT_FOUND;
break;
}
ROCKS_DB_CHECKED(status);
readBlockServiceInfo(key.toSlice(), value, info);
} else {
info = bsIt->second;
}
if ((info.flags & BlockServiceFlags::DECOMMISSIONED) != BlockServiceFlags::EMPTY) {
// no updates allowed to decomissioned services do nothing
break;
}
if ((req.flags & BlockServiceFlags::DECOMMISSIONED & (BlockServiceFlags) req.flagsMask) != BlockServiceFlags::EMPTY) {
// no longer track staleness
StaticValue<LastHeartBeatKey> lastHeartBeat;
blockServiceToLastHeartBeat(info, lastHeartBeat());
writeBatch.Delete(_lastHeartBeatCf, lastHeartBeat.toSlice());
// DECOMMISSIONED service looses other flags
info.flags = BlockServiceFlags::DECOMMISSIONED;
} else {
info.flags = (info.flags & ~(BlockServiceFlags)req.flagsMask) | (req.flags & (BlockServiceFlags) req.flagsMask);
}
info.lastInfoChange = requestTime;
writeBlockServiceInfo(writeBatch, _blockServicesCf, info);
updatedBlocks[info.id.u64] = info;
break;
}
case RegistryMessageKind::REGISTER_BLOCK_SERVICES: {
auto& req = reqContainer.getRegisterBlockServices();
for(auto& newInfo : req.blockServices.els) {
auto bsIt = updatedBlocks.find(newInfo.id.u64);
FullBlockServiceInfo info;
bool newService = false;
StaticValue<BlockServiceInfoKey> key;
key().setId(newInfo.id.u64);
if (bsIt == updatedBlocks.end()) {
std::string value;
auto status = _db->Get({}, _blockServicesCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
info.id = newInfo.id;
info.locationId = newInfo.locationId;
info.addrs = newInfo.addrs;
info.storageClass = newInfo.storageClass;
info.failureDomain = newInfo.failureDomain;
info.secretKey = newInfo.secretKey;
info.flags = BlockServiceFlags::EMPTY;
info.firstSeen = info.lastInfoChange = requestTime;
info.hasFiles = false;
info.path = newInfo.path;
newService = true;
} else {
ROCKS_DB_CHECKED(status);
readBlockServiceInfo(key.toSlice(), value, info);
}
} else {
info = bsIt->second;
}
if (info.flags == BlockServiceFlags::DECOMMISSIONED) {
// no updates to decommissioned services
continue;
}
StaticValue<WritableBlockServiceKey> writableKey;
StaticValue<LastHeartBeatKey> lastHeartBeat;
bool wasWritable = false;
if (!newService) {
if (isWritable(info.flags) && info.availableBytes > 0) {
wasWritable = true;
blockServiceToWritableBlockServiceKey(info, writableKey());
writeBatch.Delete(_writableBlockServicesCf, writableKey.toSlice());
}
blockServiceToLastHeartBeat(info, lastHeartBeat());
writeBatch.Delete(_lastHeartBeatCf, lastHeartBeat.toSlice());
}
if(info.addrs != newInfo.addrs) {
info.addrs = newInfo.addrs;
info.lastInfoChange = requestTime;
}
if ((info.flags & BlockServiceFlags::STALE) != BlockServiceFlags::EMPTY ) {
info.flags = info.flags & ~BlockServiceFlags::STALE;
info.lastInfoChange = requestTime;
}
info.lastSeen = requestTime;
info.capacityBytes = newInfo.capacityBytes;
info.availableBytes = newInfo.availableBytes;
info.blocks = newInfo.blocks;
bool nowWritable = false;
if (isWritable(info.flags) && info.availableBytes > 0 && (requestTime - info.firstSeen >= _options.blockServiceUsageDelay)) {
nowWritable = true;
blockServiceToWritableBlockServiceKey(info, writableKey());
writeBatch.Put(_writableBlockServicesCf, writableKey.toSlice(), {});
}
if (wasWritable != nowWritable) {
info.lastInfoChange = requestTime;
writableChanged = true;
}
blockServiceToLastHeartBeat(info, lastHeartBeat());
writeBatch.Put(_lastHeartBeatCf, lastHeartBeat.toSlice(),{});
writeBlockServiceInfo(writeBatch, _blockServicesCf, info);
updatedBlocks[info.id.u64] = info;
}
break;
}
case RegistryMessageKind::REGISTER_REGISTRY: {
auto& req = reqContainer.getRegisterRegistry();
StaticValue<RegistryReplicaInfoKey> key;
std::string value;
key().setReplicaId(req.replicaId);
key().setLocationId(req.location);
auto status = _db->Get({}, _registryCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
LOG_ERROR(_env, "unknown registry replica in req %s", req);
res.err = TernError::INVALID_REPLICA;
break;
}
ROCKS_DB_CHECKED(status);
FullRegistryInfo info;
readRegistryInfo(key.toSlice(), value, info);
if (_options.enforceStableIp && (!addressesIntersect(info.addrs, req.addrs))) {
res.err = TernError::DIFFERENT_ADDRS_INFO;
break;
}
if (_options.enforceStableLeader && info.isLeader != req.isLeader) {
res.err = TernError::LEADER_PREEMPTED;
break;
}
StaticValue<LastHeartBeatKey> lastHeartBeat;
registryToLastHeartBeat(info, lastHeartBeat());
writeBatch.Delete(_lastHeartBeatCf, lastHeartBeat.toSlice());
info.isLeader = req.isLeader;
info.addrs = req.addrs;
info.lastSeen = requestTime;
registryToLastHeartBeat(info, lastHeartBeat());
writeBatch.Put(_lastHeartBeatCf, lastHeartBeat.toSlice(),{});
writeRegistryInfo(writeBatch, _registryCf, info);
break;
}
case RegistryMessageKind::DECOMMISSION_BLOCK_SERVICE: {
auto& req = reqContainer.getDecommissionBlockService();
auto bsIt = updatedBlocks.find(req.id.u64);
FullBlockServiceInfo info;
StaticValue<BlockServiceInfoKey> key;
key().setId(req.id.u64);
if (bsIt == updatedBlocks.end()) {
std::string value;
auto status = _db->Get({}, _blockServicesCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
LOG_ERROR(_env, "unknown block service in req %s", req);
res.err = TernError::BLOCK_SERVICE_NOT_FOUND;
break;
}
ROCKS_DB_CHECKED(status);
readBlockServiceInfo(key.toSlice(), value, info);
} else {
info = bsIt->second;
}
if ((info.flags & BlockServiceFlags::DECOMMISSIONED) != BlockServiceFlags::EMPTY) {
// no updates allowed to decomissioned services do nothing
break;
}
if (isWritable(info.flags) && info.availableBytes > 0) {
StaticValue<WritableBlockServiceKey> writableKey;
writableChanged = true;
blockServiceToWritableBlockServiceKey(info, writableKey());
writeBatch.Delete(_writableBlockServicesCf, writableKey.toSlice());
}
// no longer track staleness
StaticValue<LastHeartBeatKey> lastHeartBeat;
blockServiceToLastHeartBeat(info, lastHeartBeat());
writeBatch.Delete(_lastHeartBeatCf, lastHeartBeat.toSlice());
// DECOMMISSIONED service looses other flags
info.flags = BlockServiceFlags::DECOMMISSIONED;
info.lastInfoChange = requestTime;
writeBlockServiceInfo(writeBatch, _blockServicesCf, info);
updatedBlocks[info.id.u64] = info;
break;
}
case RegistryMessageKind::MOVE_SHARD_LEADER: {
auto& req = reqContainer.getMoveShardLeader();
StaticValue<ShardInfoKey> key;
std::string value;
key().setLocationId(req.location);
key().setReplicaId(req.shrid.replicaId());
key().setShardId(req.shrid.shardId().u8);
auto status = _db->Get({}, _shardsCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
LOG_ERROR(_env, "unknown shard replica in req %s", req);
res.err = TernError::INVALID_REPLICA;
break;
}
ROCKS_DB_CHECKED(status);
FullShardInfo info;
readShardInfo(key.toSlice(), value, info);
info.isLeader = true;
writeShardInfo(writeBatch, _shardsCf, info);
break;
}
case RegistryMessageKind::CLEAR_SHARD_INFO: {
auto& req = reqContainer.getClearShardInfo();
StaticValue<ShardInfoKey> key;
std::string value;
key().setLocationId(req.location);
key().setReplicaId(req.shrid.replicaId());
key().setShardId(req.shrid.shardId().u8);
auto status = _db->Get({}, _shardsCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
LOG_ERROR(_env, "unknown shard replica in req %s", req);
res.err = TernError::INVALID_REPLICA;
break;
}
ROCKS_DB_CHECKED(status);
FullShardInfo info;
readShardInfo(key.toSlice(), value, info);
info.isLeader = false;
info.addrs.clear();
writeShardInfo(writeBatch, _shardsCf, info);
break;
}
case RegistryMessageKind::MOVE_CDC_LEADER: {
auto& req = reqContainer.getMoveCdcLeader();
StaticValue<CdcInfoKey> key;
std::string value;
key().setLocationId(req.location);
key().setReplicaId(req.replica);
auto status = _db->Get({}, _cdcCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
LOG_ERROR(_env, "unknown cdc replica in req %s", req);
res.err = TernError::INVALID_REPLICA;
break;
}
ROCKS_DB_CHECKED(status);
CdcInfo info;
readCdcInfo(key.toSlice(), value, info);
info.isLeader = true;
writeCdcInfo(writeBatch, _cdcCf, info);
break;
}
case RegistryMessageKind::CLEAR_CDC_INFO: {
auto& req = reqContainer.getClearCdcInfo();
StaticValue<CdcInfoKey> key;
std::string value;
key().setLocationId(req.location);
key().setReplicaId(req.replica);
auto status = _db->Get({}, _cdcCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
LOG_ERROR(_env, "unknown cdc replica in req %s", req);
res.err = TernError::INVALID_REPLICA;
break;
}
ROCKS_DB_CHECKED(status);
CdcInfo info;
readCdcInfo(key.toSlice(), value, info);
info.isLeader = false;
info.addrs.clear();
writeCdcInfo(writeBatch, _cdcCf, info);
break;
}
case RegistryMessageKind::UPDATE_BLOCK_SERVICE_PATH: {
auto& req = reqContainer.getUpdateBlockServicePath();
auto bsIt = updatedBlocks.find(req.id.u64);
FullBlockServiceInfo info;
StaticValue<BlockServiceInfoKey> key;
key().setId(req.id.u64);
if (bsIt == updatedBlocks.end()) {
std::string value;
auto status = _db->Get({}, _blockServicesCf, key.toSlice(), &value);
if (status == rocksdb::Status::NotFound()) {
LOG_ERROR(_env, "unknown block service in req %s", req);
res.err = TernError::BLOCK_SERVICE_NOT_FOUND;
break;
}
ROCKS_DB_CHECKED(status);
readBlockServiceInfo(key.toSlice(), value, info);
} else {
info = bsIt->second;
}
info.path = req.newPath;
writeBlockServiceInfo(writeBatch, _blockServicesCf, info);
updatedBlocks[info.id.u64] = info;
break;
}
default:
ALWAYS_ASSERT(false);
}
}
writeLastAppliedLogEntry(writeBatch, _defaultCf, expectedLogEntry);
ROCKS_DB_CHECKED(_db->Write({}, &writeBatch));
writableChanged = _updateStaleBlockServices(lastRequestTime) || writableChanged;
_recalcualteShardBlockServices(writableChanged);
}
void RegistryDB::_initDb() {
const auto keyExists =
[this](rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key) -> bool {
std::string value;
auto status = _db->Get({}, cf, key, &value);
if (status.IsNotFound()) {
return false;
} else {
ROCKS_DB_CHECKED(status);
return true;
}
};
if (!keyExists(_defaultCf, registryMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY))) {
LOG_INFO(_env, "initializing Registry RocksDB");
rocksdb::WriteBatch batch;
LOG_INFO(_env, "initializing last applied log entry");
writeLastAppliedLogEntry(batch, _defaultCf, LogIdx(0));
LOG_INFO(_env, "initializing default location");
LocationInfo defaultLocation;
defaultLocation.id = DEFAULT_LOCATION;
defaultLocation.name = "default";
writeLocationInfo(batch, _locationsCf, defaultLocation);
LOG_INFO(_env, "initializing registries");
initializeRegistries(batch, _registryCf);
LOG_INFO(_env, "initializing shards for default location");
initializeShardsForLocation(batch, _shardsCf, defaultLocation.id);
LOG_INFO(_env, "initializing cdc for default location");
initializeCdcForLocation(batch, _cdcCf, defaultLocation.id);
ROCKS_DB_CHECKED(_db->Write({}, &batch));
LOG_INFO(_env, "initialized Registry RocksDB");
}
}
void RegistryDB::_recalcualteShardBlockServices(bool writableChanged) {
if (!writableChanged && (ternNow() - _lastCalculatedShardBlockServices < _options.writableBlockServiceUpdateInterval)) {
return;
}
_lastCalculatedShardBlockServices = ternNow();
for(int i = 0; i < 256; ++i) {
_shardBlockServices[(uint8_t) i].clear();
}
auto *it = _db->NewIterator(rocksdb::ReadOptions(), _writableBlockServicesCf);
BincodeFixedBytes<16> lastFd;
lastFd.clear();
uint8_t lastStorageClass = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
auto writableKey = ExternalValue<WritableBlockServiceKey>::FromSlice(it->key());
if (lastFd == writableKey().failureDomain() && lastStorageClass == writableKey().storageClass()) {
continue;
}
lastFd = writableKey().failureDomain();
lastStorageClass = writableKey().storageClass();
auto id = writableKey().id();
StaticValue<BlockServiceInfoKey> key;
key().setId(id);
std::string value;
auto status = _db->Get({}, _blockServicesCf, key.toSlice(), &value);
ROCKS_DB_CHECKED(status);
FullBlockServiceInfo info;
readBlockServiceInfo(key.toSlice(), value, info);
BlockServiceInfoShort bsShort;
bsShort.id = id;
bsShort.locationId = info.locationId;
bsShort.failureDomain = info.failureDomain;
bsShort.storageClass = info.storageClass;
for(int i = 0; i < 256; ++i) {
_shardBlockServices[(uint8_t) i].push_back(bsShort);
}
}
ROCKS_DB_CHECKED(it->status());
delete it;
}
bool RegistryDB::_updateStaleBlockServices(TernTime now) {
rocksdb::WriteBatch writeBatch;
bool writableChanged = false;
auto *it = _db->NewIterator(rocksdb::ReadOptions(), _lastHeartBeatCf);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
auto lastHeartBeat = ExternalValue<LastHeartBeatKey>::FromSlice(it->key());
if (TernTime(lastHeartBeat().lastSeen()) + _options.staleDelay > now) {
break;
}
if (lastHeartBeat()._serviceType() != ServiceType::BLOCK_SERVICE) {
continue;
}
auto serviceBytes = lastHeartBeat()._serviceBytes();
auto bsKey = ExternalValue<BlockServiceInfoKey>::FromSlice({(char*)serviceBytes.data(), BlockServiceInfoKey::MAX_SIZE});
writeBatch.Delete(_lastHeartBeatCf, it->key());
std::string value;
auto status = _db->Get({}, _blockServicesCf, bsKey.toSlice(), &value);
ROCKS_DB_CHECKED(status);
FullBlockServiceInfo info;
readBlockServiceInfo(bsKey.toSlice(), value, info);
StaticValue<WritableBlockServiceKey> writableKey;
if (isWritable(info.flags) && info.availableBytes > 0) {
writableChanged = true;
blockServiceToWritableBlockServiceKey(info, writableKey());
writeBatch.Delete(_writableBlockServicesCf, writableKey.toSlice());
}
info.flags = info.flags | BlockServiceFlags::STALE;
writeBlockServiceInfo(writeBatch, _blockServicesCf, info);
}
ROCKS_DB_CHECKED(it->status());
delete it;
ROCKS_DB_CHECKED(_db->Write({}, &writeBatch));
return writableChanged;
}
LogIdx RegistryDB::lastAppliedLogEntry() const {
return readLastAppliedLogEntry(_db, _defaultCf);
}
void RegistryDB::registries(std::vector<FullRegistryInfo>& out) const {
out.clear();
auto *it = _db->NewIterator(rocksdb::ReadOptions(), _registryCf);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
readRegistryInfo(it->key(), it->value(), out.emplace_back());
}
ROCKS_DB_CHECKED(it->status());
delete it;
ALWAYS_ASSERT(out.size() == LogsDB::REPLICA_COUNT);
}
void RegistryDB::locations(std::vector<LocationInfo>& out) const {
out.clear();
auto *it = _db->NewIterator(rocksdb::ReadOptions(), _locationsCf);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
readLocationInfo(it->key(), it->value(), out.emplace_back());
}
ROCKS_DB_CHECKED(it->status());
delete it;
ALWAYS_ASSERT(!out.empty());
}
void RegistryDB::shards(std::vector<FullShardInfo>& out) const {
out.clear();
auto *it = _db->NewIterator(rocksdb::ReadOptions(), _shardsCf);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
auto& shard = out.emplace_back();
readShardInfo(it->key(), it->value(), shard);
}
ROCKS_DB_CHECKED(it->status());
delete it;
ALWAYS_ASSERT((!out.empty()) && (out.size() % LogsDB::REPLICA_COUNT * ShardId::SHARD_COUNT == 0));
}
void RegistryDB::cdcs(std::vector<CdcInfo>& out) const {
out.clear();
auto *it = _db->NewIterator(rocksdb::ReadOptions(), _cdcCf);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
readCdcInfo(it->key(), it->value(), out.emplace_back());
}
ROCKS_DB_CHECKED(it->status());
delete it;
ALWAYS_ASSERT((!out.empty()) && (out.size() % LogsDB::REPLICA_COUNT == 0));
}
void RegistryDB::blockServices(std::vector<FullBlockServiceInfo>& out) const{
out.clear();
auto *it = _db->NewIterator(rocksdb::ReadOptions(), _blockServicesCf);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
readBlockServiceInfo(it->key(), it->value(), out.emplace_back());
}
ROCKS_DB_CHECKED(it->status());
delete it;
}

View File

@@ -0,0 +1,60 @@
#pragma once
#include <vector>
#include "LogsDB.hpp"
#include "Msgs.hpp"
#include "MsgsGen.hpp"
#include "RegistryCommon.hpp"
#include "SharedRocksDB.hpp"
struct RegistryDBWriteResult {
LogIdx idx;
RegistryMessageKind kind;
TernError err;
};
class RegistryDB {
public:
static std::vector<rocksdb::ColumnFamilyDescriptor> getColumnFamilyDescriptors();
RegistryDB(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const RegistryOptions& options, const SharedRocksDB& sharedDB);
~RegistryDB() {}
void close();
LogIdx lastAppliedLogEntry() const;
void registries(std::vector<FullRegistryInfo>& out) const;
void locations(std::vector<LocationInfo>& out) const;
void shards(std::vector<FullShardInfo>& out) const;
void cdcs(std::vector<CdcInfo>& out) const;
void blockServices(std::vector<FullBlockServiceInfo>& out) const;
void shardBlockServices(ShardId shardId, std::vector<BlockServiceInfoShort>& out) const {
out = _shardBlockServices.at(shardId.u8);
}
void processLogEntries(std::vector<LogsDBLogEntry>& logEntries, std::vector<RegistryDBWriteResult>& writeResults);
private:
void _initDb();
bool _updateStaleBlockServices(TernTime now);
void _recalcualteShardBlockServices(bool writableChanged);
const RegistryOptions& _options;
Env _env;
TernTime _lastCalculatedShardBlockServices;
std::unordered_map<uint8_t, std::vector<BlockServiceInfoShort>> _shardBlockServices;
rocksdb::DB* _db;
rocksdb::ColumnFamilyHandle* _defaultCf;
rocksdb::ColumnFamilyHandle* _registryCf;
rocksdb::ColumnFamilyHandle* _locationsCf;
rocksdb::ColumnFamilyHandle* _shardsCf;
rocksdb::ColumnFamilyHandle* _cdcCf;
rocksdb::ColumnFamilyHandle* _blockServicesCf;
rocksdb::ColumnFamilyHandle* _lastHeartBeatCf;
rocksdb::ColumnFamilyHandle* _writableBlockServicesCf;
};

View File

@@ -0,0 +1,438 @@
#pragma once
#include <cstdint>
#include <rocksdb/slice.h>
#include "Assert.hpp"
#include "LogsDB.hpp"
#include "Msgs.hpp"
#include "MsgsGen.hpp"
#include "RocksDBUtils.hpp"
#include "Time.hpp"
enum class RegistryMetadataKey : uint8_t {
LAST_APPLIED_LOG_ENTRY = 0,
};
constexpr RegistryMetadataKey LAST_APPLIED_LOG_ENTRY_KEY = RegistryMetadataKey::LAST_APPLIED_LOG_ENTRY;
inline rocksdb::Slice registryMetadataKey(const RegistryMetadataKey* k) {
return rocksdb::Slice((const char*)k, sizeof(*k));
}
static inline LogIdx readLastAppliedLogEntry(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf) {
std::string value;
ROCKS_DB_CHECKED(db->Get({}, cf,
registryMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), &value));
return LogIdx(ExternalValue<U64Value>(value)().u64());
}
static inline void writeLastAppliedLogEntry(rocksdb::WriteBatch& batch, rocksdb::ColumnFamilyHandle* cf, LogIdx lastAppliedLogEntry) {
auto v = U64Value::Static(lastAppliedLogEntry.u64);
ROCKS_DB_CHECKED(
batch.Put(cf, registryMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), v.toSlice())
);
}
struct LocationInfoKey {
FIELDS(
LE, uint8_t, locationId, setLocationId,
END_STATIC
)
};
static inline void readLocationInfo(rocksdb::Slice key, rocksdb::Slice value, LocationInfo& locationInfo) {
locationInfo.id = ExternalValue<LocationInfoKey>::FromSlice(key)().locationId();
locationInfo.name.copy(value.data(), value.size());
}
static inline void writeLocationInfo(rocksdb::WriteBatch& batch, rocksdb::ColumnFamilyHandle* cf, const LocationInfo& locationInfo) {
StaticValue<LocationInfoKey> key;
key().setLocationId(locationInfo.id);
ROCKS_DB_CHECKED(
batch.Put(cf, key.toSlice(), rocksdb::Slice(locationInfo.name.data(), locationInfo.name.size()))
);
}
struct RegistryReplicaInfoKey {
FIELDS(
LE, uint8_t, locationId, setLocationId,
LE, ReplicaId, replicaId, setReplicaId,
END_STATIC
)
};
struct RegistryReplicaInfoBody {
FIELDS(
LE, uint8_t, version, _setVersion,
LE, bool, isLeader, setIsLeader,
LE, TernTime, lastSeen, setLastSeen,
FBYTES, 4, ip1, setIp1,
BE, uint16_t, port1, setPort1,
FBYTES, 4, ip2, setIp2,
BE, uint16_t, port2, setPort2,
END_STATIC
)
};
static inline void readRegistryInfo(rocksdb::Slice key_, rocksdb::Slice value_, FullRegistryInfo& registryInfo) {
auto key = ExternalValue<RegistryReplicaInfoKey>::FromSlice(key_);
auto value = ExternalValue<RegistryReplicaInfoBody>::FromSlice(value_);
registryInfo.id = key().replicaId();
registryInfo.locationId = key().locationId();
registryInfo.isLeader = value().isLeader();
registryInfo.lastSeen = value().lastSeen();
registryInfo.addrs.addrs[0].ip = value().ip1();
registryInfo.addrs.addrs[0].port = value().port1();
registryInfo.addrs.addrs[1].ip = value().ip2();
registryInfo.addrs.addrs[1].port = value().port2();
}
static inline void writeRegistryInfo(rocksdb::WriteBatch& batch, rocksdb::ColumnFamilyHandle* cf, const FullRegistryInfo& registryInfo) {
StaticValue<RegistryReplicaInfoKey> key;
StaticValue<RegistryReplicaInfoBody> value;
key().setReplicaId(registryInfo.id);
key().setLocationId(registryInfo.locationId);
value()._setVersion(0);
value().setIsLeader(registryInfo.isLeader);
value().setLastSeen(registryInfo.lastSeen);
value().setIp1(registryInfo.addrs.addrs[0].ip.data);
value().setPort1(registryInfo.addrs.addrs[0].port);
value().setIp2(registryInfo.addrs.addrs[1].ip.data);
value().setPort2(registryInfo.addrs.addrs[1].port);
ROCKS_DB_CHECKED(
batch.Put(cf, key.toSlice(), value.toSlice())
);
}
static inline void initializeRegistries(rocksdb::WriteBatch& batch, rocksdb::ColumnFamilyHandle* cf) {
FullRegistryInfo registryInfo;
registryInfo.clear();
for (ReplicaId id = 0; id.u8 < LogsDB::REPLICA_COUNT; ++id.u8) {
registryInfo.id = id;
writeRegistryInfo(batch, cf, registryInfo);
}
}
struct ShardInfoKey {
FIELDS(
LE, uint8_t, locationId, setLocationId,
LE, uint8_t, shardId, setShardId,
LE, ReplicaId, replicaId, setReplicaId,
END_STATIC
)
};
struct ShardInfoValue {
FIELDS(
LE, uint8_t, version, _setVersion,
LE, bool, isLeader, setIsLeader,
LE, TernTime, lastSeen, setLastSeen,
FBYTES, 4, ip1, setIp1,
BE, uint16_t, port1, setPort1,
FBYTES, 4, ip2, setIp2,
BE, uint16_t, port2, setPort2,
END_STATIC
)
};
static inline void readShardInfo(rocksdb::Slice key_, rocksdb::Slice value_, FullShardInfo& shardInfo) {
auto key = ExternalValue<ShardInfoKey>::FromSlice(key_);
auto value = ExternalValue<ShardInfoValue>::FromSlice(value_);
shardInfo.id = ShardReplicaId(key().shardId(), key().replicaId());
shardInfo.locationId = key().locationId();
shardInfo.isLeader = value().isLeader();
shardInfo.lastSeen = value().lastSeen();
shardInfo.addrs.addrs[0].ip = value().ip1();
shardInfo.addrs.addrs[0].port = value().port1();
shardInfo.addrs.addrs[1].ip = value().ip2();
shardInfo.addrs.addrs[1].port = value().port2();
}
static inline void writeShardInfo(rocksdb::WriteBatch& batch, rocksdb::ColumnFamilyHandle* cf, const FullShardInfo& shardInfo) {
StaticValue<ShardInfoKey> key;
StaticValue<ShardInfoValue> value;
key().setLocationId(shardInfo.locationId);
key().setShardId(shardInfo.id.shardId().u8);
key().setReplicaId(shardInfo.id.replicaId().u8);
value()._setVersion(0);
value().setIsLeader(shardInfo.isLeader);
value().setLastSeen(shardInfo.lastSeen);
value().setIp1(shardInfo.addrs.addrs[0].ip.data);
value().setPort1(shardInfo.addrs.addrs[0].port);
value().setIp2(shardInfo.addrs.addrs[1].ip.data);
value().setPort2(shardInfo.addrs.addrs[1].port);
ROCKS_DB_CHECKED(
batch.Put(cf, key.toSlice(), value.toSlice())
);
}
static inline void initializeShardsForLocation(
rocksdb::WriteBatch& batch, rocksdb::ColumnFamilyHandle* cf, LocationId locationId)
{
FullShardInfo shardInfo;
shardInfo.clear();
shardInfo.locationId = locationId;
for(uint16_t id = 0; id < ShardId::SHARD_COUNT; ++id) {
for (ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8) {
shardInfo.id = ShardReplicaId(ShardId(id), replicaId);
writeShardInfo(batch, cf, shardInfo);
}
}
}
struct CdcInfoKey {
FIELDS(
LE, uint8_t, locationId, setLocationId,
LE, ReplicaId, replicaId, setReplicaId,
END_STATIC
)
};
struct CdcInfoValue {
FIELDS(
LE, uint8_t, version, _setVersion,
LE, bool, isLeader, setIsLeader,
LE, TernTime, lastSeen, setLastSeen,
FBYTES, 4, ip1, setIp1,
BE, uint16_t, port1, setPort1,
FBYTES, 4, ip2, setIp2,
BE, uint16_t, port2, setPort2,
END_STATIC
)
};
static inline void readCdcInfo(rocksdb::Slice key_, rocksdb::Slice value_, CdcInfo& cdcInfo) {
auto key = ExternalValue<CdcInfoKey>::FromSlice(key_);
auto value = ExternalValue<CdcInfoValue>::FromSlice(value_);
cdcInfo.replicaId = key().replicaId();
cdcInfo.locationId = key().locationId();
cdcInfo.isLeader = value().isLeader();
cdcInfo.lastSeen = value().lastSeen();
cdcInfo.addrs.addrs[0].ip = value().ip1();
cdcInfo.addrs.addrs[0].port = value().port1();
cdcInfo.addrs.addrs[1].ip = value().ip2();
cdcInfo.addrs.addrs[1].port = value().port2();
}
static inline void writeCdcInfo(rocksdb::WriteBatch& batch, rocksdb::ColumnFamilyHandle* cf, const CdcInfo& cdcInfo) {
StaticValue<CdcInfoKey> key;
StaticValue<CdcInfoValue> value;
key().setLocationId(cdcInfo.locationId);
key().setReplicaId(cdcInfo.replicaId.u8);
value()._setVersion(0);
value().setIsLeader(cdcInfo.isLeader);
value().setLastSeen(cdcInfo.lastSeen);
value().setIp1(cdcInfo.addrs.addrs[0].ip.data);
value().setPort1(cdcInfo.addrs.addrs[0].port);
value().setIp2(cdcInfo.addrs.addrs[1].ip.data);
value().setPort2(cdcInfo.addrs.addrs[1].port);
ROCKS_DB_CHECKED(
batch.Put(cf, key.toSlice(), value.toSlice())
);
}
static inline void initializeCdcForLocation(
rocksdb::WriteBatch& batch, rocksdb::ColumnFamilyHandle* cf, LocationId locationId)
{
CdcInfo cdcInfo;
cdcInfo.clear();
cdcInfo.locationId = locationId;
for (ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8) {
cdcInfo.replicaId = replicaId;
writeCdcInfo(batch, cf, cdcInfo);
}
}
struct BlockServiceInfoKey {
FIELDS(
LE, uint64_t, id, setId,
END_STATIC
)
};
struct BlockServiceInfoValue {
FIELDS(
LE, uint8_t, version, _setVersion,
LE, uint8_t, locationId, setLocationId,
LE, uint8_t, storageClass, setStorageClass,
FBYTES, 16, failureDomain, setFailureDomain,
FBYTES, 16, secretKey, setSecretKey,
LE, TernTime, firstSeen, setFirstSeen,
LE, TernTime, lastSeen, setLastSeen,
LE, TernTime, lastInfoChange, setLastInfoChange,
FBYTES, 4, ip1, setIp1,
BE, uint16_t, port1, setPort1,
FBYTES, 4, ip2, setIp2,
BE, uint16_t, port2, setPort2,
LE, BlockServiceFlags, flags, setFlags,
LE, uint64_t, capacityBytes, setCapacityBytes,
LE, uint64_t, availableBytes, setAvailableBytes,
LE, uint64_t, blocks, setBlocks,
LE, bool, hasFiles, setHasFiles,
EMIT_OFFSET, STATIC_SIZE,
BYTES, path, setPath,
END
)
static constexpr size_t MIN_SIZE =
STATIC_SIZE +
sizeof(uint8_t); // pathLen
static constexpr size_t MAX_SIZE = MIN_SIZE + 255;
size_t size() const {
return MIN_SIZE + path().size();
}
void checkSize(size_t sz) {
ALWAYS_ASSERT(sz >= MIN_SIZE, "expected %s >= %s", sz, MIN_SIZE);
ALWAYS_ASSERT(sz == size());
}
};
static inline void readBlockServiceInfo(rocksdb::Slice key_, rocksdb::Slice value_, FullBlockServiceInfo& blockServiceInfo) {
auto key = ExternalValue<BlockServiceInfoKey>::FromSlice(key_);
auto value = ExternalValue<BlockServiceInfoValue>::FromSlice(value_);
blockServiceInfo.id = key().id();
blockServiceInfo.locationId = value().locationId();
blockServiceInfo.storageClass = value().storageClass();
blockServiceInfo.failureDomain.name = value().failureDomain();
blockServiceInfo.secretKey = value().secretKey();
blockServiceInfo.firstSeen = value().firstSeen();
blockServiceInfo.lastSeen = value().lastSeen();
blockServiceInfo.lastInfoChange = value().lastInfoChange();
blockServiceInfo.addrs.addrs[0].ip = value().ip1();
blockServiceInfo.addrs.addrs[0].port = value().port1();
blockServiceInfo.addrs.addrs[1].ip = value().ip2();
blockServiceInfo.addrs.addrs[1].port = value().port2();
blockServiceInfo.flags = value().flags();
blockServiceInfo.capacityBytes = value().capacityBytes();
blockServiceInfo.availableBytes = value().availableBytes();
blockServiceInfo.blocks = value().blocks();
blockServiceInfo.hasFiles = value().hasFiles();
blockServiceInfo.path = value().path();
}
static inline void writeBlockServiceInfo(
rocksdb::WriteBatch& batch, rocksdb::ColumnFamilyHandle* cf, const FullBlockServiceInfo& blockServiceInfo)
{
StaticValue<BlockServiceInfoKey> key;
StaticValue<BlockServiceInfoValue> value;
key().setId(blockServiceInfo.id.u64);
value()._setVersion(0);
value().setLocationId(blockServiceInfo.locationId);
value().setStorageClass(blockServiceInfo.storageClass);
value().setFailureDomain(blockServiceInfo.failureDomain.name.data);
value().setSecretKey(blockServiceInfo.secretKey.data);
value().setFirstSeen(blockServiceInfo.firstSeen);
value().setLastSeen(blockServiceInfo.lastSeen);
value().setLastInfoChange(blockServiceInfo.lastInfoChange);
value().setIp1(blockServiceInfo.addrs.addrs[0].ip.data);
value().setPort1(blockServiceInfo.addrs.addrs[0].port);
value().setIp2(blockServiceInfo.addrs.addrs[1].ip.data);
value().setPort2(blockServiceInfo.addrs.addrs[1].port);
value().setFlags(blockServiceInfo.flags);
value().setCapacityBytes(blockServiceInfo.capacityBytes);
value().setAvailableBytes(blockServiceInfo.availableBytes);
value().setBlocks(blockServiceInfo.blocks);
value().setHasFiles(blockServiceInfo.hasFiles);
value().setPath(blockServiceInfo.path.ref());
ROCKS_DB_CHECKED(
batch.Put(cf, key.toSlice(), value.toSlice())
);
}
static constexpr auto MAX_SERVICE_KEY_SIZE =
std::max(std::max(std::max(
RegistryReplicaInfoKey::MAX_SIZE, ShardInfoKey::MAX_SIZE), CdcInfoKey::MAX_SIZE),BlockServiceInfoKey::MAX_SIZE);
enum class ServiceType : uint8_t {
REGISTRY = 0,
SHARD = 1,
CDC = 2,
BLOCK_SERVICE = 3
};
struct LastHeartBeatKey {
FIELDS(
BE, uint64_t, lastSeen, setLastSeen,
LE, ServiceType, _serviceType, setServiceType,
FBYTES, MAX_SERVICE_KEY_SIZE, _serviceBytes, _setServiceBytes,
END_STATIC
)
};
static inline const ExternalValue<LastHeartBeatKey> readLastHeartBeatInfo(rocksdb::Slice key) {
return ExternalValue<LastHeartBeatKey>::FromSlice(key);
}
static inline void registryToLastHeartBeat(const FullRegistryInfo& registry, LastHeartBeatKey key) {
key.setLastSeen(registry.lastSeen.ns);
key.setServiceType(ServiceType::REGISTRY);
FBytesArr<MAX_SERVICE_KEY_SIZE> serviceBytes;
auto registryKey = ExternalValue<RegistryReplicaInfoKey>::FromSlice({(char*)serviceBytes.data(), RegistryReplicaInfoKey::MAX_SIZE});
registryKey().setLocationId(registry.locationId);
registryKey().setReplicaId(registry.id.u8);
key._setServiceBytes(serviceBytes);
}
static inline void shardToLastHeartBeat(const FullShardInfo& shard, LastHeartBeatKey key) {
key.setLastSeen(shard.lastSeen.ns);
key.setServiceType(ServiceType::SHARD);
FBytesArr<MAX_SERVICE_KEY_SIZE> serviceBytes;
auto shardKey = ExternalValue<ShardInfoKey>::FromSlice({(char*)serviceBytes.data(), ShardInfoKey::MAX_SIZE});
shardKey().setLocationId(shard.locationId);
shardKey().setShardId(shard.id.shardId().u8);
shardKey().setReplicaId(shard.id.replicaId().u8);
key._setServiceBytes(serviceBytes);
}
static inline void cdcToLastHeartBeat(const CdcInfo& cdc, LastHeartBeatKey key) {
key.setLastSeen(cdc.lastSeen.ns);
key.setServiceType(ServiceType::CDC);
FBytesArr<MAX_SERVICE_KEY_SIZE> serviceBytes;
auto cdcKey = ExternalValue<CdcInfoKey>::FromSlice({(char*)serviceBytes.data(), CdcInfoKey::MAX_SIZE});
cdcKey().setLocationId(cdc.locationId);
cdcKey().setReplicaId(cdc.replicaId.u8);
key._setServiceBytes(serviceBytes);
}
static inline void blockServiceToLastHeartBeat(const FullBlockServiceInfo& bs, LastHeartBeatKey key) {
key.setLastSeen(bs.lastSeen.ns);
key.setServiceType(ServiceType::BLOCK_SERVICE);
FBytesArr<MAX_SERVICE_KEY_SIZE> serviceBytes;
auto bsKey = ExternalValue<BlockServiceInfoKey>::FromSlice({(char*)serviceBytes.data(), BlockServiceInfoKey::MAX_SIZE});
bsKey().setId(bs.id.u64);
key._setServiceBytes(serviceBytes);
}
struct WritableBlockServiceKey {
FIELDS(
LE, uint8_t, locationId, setLocationId,
LE, uint8_t, storageClass, setStorageClass,
FBYTES, 16, failureDomain, setFailureDomain,
BE, uint64_t, availableBytes, setAvailableBytes,
LE, uint64_t, id, setId,
END_STATIC
)
};
static inline void blockServiceToWritableBlockServiceKey(const FullBlockServiceInfo& bs, WritableBlockServiceKey key) {
ALWAYS_ASSERT(bs.availableBytes > 0);
ALWAYS_ASSERT(isWritable(bs.flags));
key.setLocationId(bs.locationId);
key.setStorageClass(bs.storageClass);
key.setFailureDomain(bs.failureDomain.name.data);
key.setAvailableBytes(bs.availableBytes);
key.setId(bs.id.u64);
}

View File

@@ -0,0 +1,6 @@
#pragma once
#include <array>
#include <cstdint>
static const std::array<uint8_t, 16> RegistryKey{0x62, 0xad, 0xd2, 0x0f, 0x2d, 0x44, 0xc8, 0x3b, 0xa2, 0x6d, 0xd6, 0xf9, 0x29, 0xaa, 0xe2, 0x0f};

View File

@@ -0,0 +1,439 @@
#include "RegistryServer.hpp"
#include "Assert.hpp"
#include "Env.hpp"
#include "MsgsGen.hpp"
bool RegistryServer::init() {
_epollFd = epoll_create1(0);
if (_epollFd == -1) {
LOG_ERROR(_env, "Failed to create epoll instance: %s",
strerror(errno));
return false;
}
for (int i = 0; i < _options.addrs.size(); ++i) {
auto& addr = _options.addrs[i];
if (addr.ip.data[0] == 0) {
continue;
}
int fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (fd == -1) {
LOG_ERROR(_env, "Failed to create socket: %s", strerror(errno));
return false;
}
int opt = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
sockaddr_in sockAddr{};
addr.toSockAddrIn(sockAddr);
if (bind(fd, (sockaddr *)&sockAddr, sizeof(sockAddr)) == -1) {
LOG_ERROR(_env, "Failed to bind socket: %s", strerror(errno));
return false;
}
if (listen(fd, SOMAXCONN) == -1) {
LOG_ERROR(_env, "Failed to listen on socket: %s", strerror(errno));
return false;
}
epoll_event event{};
event.events = EPOLLIN;
event.data.fd = fd;
if (epoll_ctl(_epollFd, EPOLL_CTL_ADD, fd, &event) == -1){
LOG_ERROR(_env, "Failed to register for epoll fd %s", fd);
return false;
}
_sockFds[i] = fd;
}
if (_socks[0].registerEpoll(_epollFd) == -1) {
LOG_ERROR(_env, "Failed to register udp socks for epoll");
return false;
}
LOG_INFO(_env, "initialized sockets");
return true;
}
bool RegistryServer::receiveMessages(Duration timeout){
ALWAYS_ASSERT(_receivedRequests.empty());
ALWAYS_ASSERT(_logsDBRequests.empty());
ALWAYS_ASSERT(_logsDBResponses.empty());
int numEvents = Loop::epollWait(_epollFd, &_events[0], _events.size(), timeout);
LOG_TRACE(_env, "epoll returned %s events", numEvents);
if (numEvents == -1) {
if (errno != EINTR) {
LOG_ERROR(_env, "epoll_wait error: %s", strerror(errno));
}
return false;
}
bool haveUdpMessages = false;
for (int i = 0; i < numEvents; ++i) {
if (_events[i].data.fd == _sockFds[0]) {
_acceptConnection(_sockFds[0]);
} else if (_events[i].data.fd == _sockFds[1]) {
_acceptConnection(_sockFds[1]);
} else if (_socks[0].containsFd(_events[i].data.fd)) {
haveUdpMessages = true;
} else if (_events[i].events & (EPOLLHUP | EPOLLRDHUP | EPOLLERR)) {
_removeClient(_events[i].data.fd);
} else if (_events[i].events & EPOLLIN) {
_readClient(_events[i].data.fd);
} else if (_events[i].events & EPOLLOUT) {
_writeClient(_events[i].data.fd);
}
}
if (haveUdpMessages) {
if (!_channel.receiveMessages(_env, _socks, *_receiver, MAX_RECV_MSGS, -1, true)) {
LOG_ERROR(_env, "failed receiving UDP messages");
};
for (auto &msg : _channel.protocolMessages(LOG_RESP_PROTOCOL_VERSION)) {
_handleLogsDBResponse(msg);
}
for (auto &msg : _channel.protocolMessages(LOG_REQ_PROTOCOL_VERSION)) {
_handleLogsDBRequest(msg);
}
}
return true;
}
void RegistryServer::sendLogsDBMessages(std::vector<LogsDBRequest *>& requests, std::vector<LogsDBResponse>& responses) {
for (auto request : requests) {
_packLogsDBRequest(*request);
}
for (auto& response : responses) {
_packLogsDBResponse(response);
}
requests.clear();
responses.clear();
_sender.sendMessages(_env, _socks[0]);
}
void RegistryServer::sendRegistryResponses(std::vector<RegistryResponse>& responses) {
for (auto& response : responses) {
auto inFlightIt = _inFlightRequests.find(response.requestId);
if (inFlightIt == _inFlightRequests.end()) {
LOG_TRACE(_env, "Dropping response %s for requestId %s as request was dropped ", response.resp, response.requestId);
continue;
}
int fd = inFlightIt->second;
_inFlightRequests.erase(inFlightIt);
if (response.resp.kind() == RegistryMessageKind::EMPTY) {
// drop connection on empty response
LOG_TRACE(_env, "Dropping connection with fd %s due to empty response", fd);
_removeClient(fd);
continue;
}
_sendResponse(fd, response.resp);
}
}
static constexpr size_t MESSAGE_HEADER_SIZE = 8;
static constexpr size_t MESSAGE_HEADER_LENGTH_OFFSET = 4;
void RegistryServer::_acceptConnection(int fd) {
sockaddr_in clientAddr{};
socklen_t clientAddrLen = sizeof(clientAddr);
int clientFd = accept4(fd, (sockaddr *)&clientAddr, &clientAddrLen, SOCK_NONBLOCK);
if (clientFd == -1) {
LOG_ERROR(_env, "Failed to accept connection: %s", strerror(errno));
return;
}
if (_clients.size() == _maxConnections) {
LOG_DEBUG(_env, "dropping connection as we reached connection limit");
close(fd);
return;
}
auto client_it = _clients.emplace(clientFd, Client{clientFd, {}, {}, ternNow(),0,0}).first;
client_it->second.readBuffer.resize(MESSAGE_HEADER_SIZE);
epoll_event event{};
event.events = EPOLLIN | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
event.data.fd = clientFd;
if (epoll_ctl(_epollFd, EPOLL_CTL_ADD, clientFd, &event) == -1) {
LOG_ERROR(_env, "Failed to add client to epoll: %s", strerror(errno));
_removeClient(clientFd);
return;
}
}
void RegistryServer::_readClient(int fd) {
auto it = _clients.find(fd);
ALWAYS_ASSERT(it != _clients.end());
Client &client = it->second;
client.lastActive = ternNow();
size_t bytesToRead = client.readBuffer.size() - client.messageBytesProcessed;
ssize_t bytesRead;
while (bytesToRead > 0 &&
(bytesRead = read(fd, &client.readBuffer[client.messageBytesProcessed], bytesToRead)) > 0) {
LOG_TRACE(_env, "Received %s bytes from client", bytesRead);
bytesToRead -= bytesRead;
client.messageBytesProcessed += bytesRead;
if (bytesToRead > 0) {
continue;
}
if (client.messageBytesProcessed == MESSAGE_HEADER_SIZE) {
BincodeBuf buf{&client.readBuffer[0], MESSAGE_HEADER_SIZE};
uint32_t protocol = buf.unpackScalar<uint32_t>();
if (protocol != REGISTRY_REQ_PROTOCOL_VERSION) {
LOG_ERROR(_env, "Invalid protocol version: %s", protocol);
_removeClient(fd);
return;
}
uint32_t len = buf.unpackScalar<uint32_t>();
buf.ensureFinished();
LOG_TRACE(_env, "Received message of length %s", len);
bytesToRead = len;
client.readBuffer.resize(len + MESSAGE_HEADER_SIZE);
LOG_TRACE(_env, "ReadBuffer new size %s", client.readBuffer.size());
} else {
LOG_TRACE(_env, "Unpacking ReadBuffer size %s", client.readBuffer.size());
BincodeBuf buf{&client.readBuffer[MESSAGE_HEADER_SIZE], client.readBuffer.size() - MESSAGE_HEADER_SIZE};
auto &req = _receivedRequests.emplace_back();
try {
LOG_TRACE(_env, "buf len %s", buf.remaining());
req.req.unpack(buf);
buf.ensureFinished();
LOG_TRACE(_env, "Recieved request on fd %s, %s", fd, req.req);
// Remove read event from epoll after receiving complete request
epoll_event event{};
event.events =
EPOLLHUP | EPOLLERR | EPOLLRDHUP; // Keep only hangup detection
event.data.fd = fd;
if (epoll_ctl(_epollFd, EPOLL_CTL_MOD, fd, &event) == -1) {
LOG_ERROR(_env, "Failed to modify client epoll event: %s", strerror(errno));
_receivedRequests.pop_back();
_removeClient(fd);
return;
}
} catch (const BincodeException &err) {
LOG_ERROR(_env, "Could not parse RegistryReq: %s", err.what());
_receivedRequests.pop_back();
_removeClient(fd);
return;
}
req.requestId = ++_lastRequestId;
client.readBuffer.clear();
client.messageBytesProcessed = 0;
client.inFlightRequestId = req.requestId;
_inFlightRequests.emplace(req.requestId, fd);
}
}
if (bytesRead == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
LOG_DEBUG(_env, "Error reading from client: %s", strerror(errno));
_removeClient(fd);
}
if (bytesRead == 0) {
_removeClient(fd);
}
}
void RegistryServer::_removeClient(int fd) {
auto it = _clients.find(fd);
ALWAYS_ASSERT(it != _clients.end());
epoll_ctl(_epollFd, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
if (it->second.inFlightRequestId != 0) {
_inFlightRequests.erase(it->second.inFlightRequestId);
}
_clients.erase(it);
LOG_TRACE(_env, "removing client %s", fd);
}
void RegistryServer::_handleLogsDBResponse(UDPMessage &msg) {
LOG_TRACE(_env, "received LogsDBResponse from %s", msg.clientAddr);
LogRespMsg *respMsg = nullptr;
auto replicaId = _getReplicaId(msg.clientAddr);
if (replicaId != LogsDB::REPLICA_COUNT) {
auto &resp = _logsDBResponses.emplace_back();
resp.replicaId = replicaId;
respMsg = &resp.msg;
}
if (respMsg == nullptr) {
LOG_DEBUG(_env, "We can't match this address to replica or other leader. Dropping");
return;
}
try {
respMsg->unpack(msg.buf, _expandedRegistryKey);
} catch (const BincodeException &err) {
LOG_ERROR(_env, "Could not parse LogsDBResponse: %s", err.what());
_logsDBResponses.pop_back();
return;
}
LOG_TRACE(_env, "Received response %s for requests id %s from replica id %s", respMsg->body.kind(), respMsg->id, replicaId);
}
void RegistryServer::_handleLogsDBRequest(UDPMessage &msg) {
LOG_TRACE(_env, "received LogsDBRequest from %s", msg.clientAddr);
LogReqMsg *reqMsg = nullptr;
auto replicaId = _getReplicaId(msg.clientAddr);
if (replicaId != LogsDB::REPLICA_COUNT) {
auto &req = _logsDBRequests.emplace_back();
req.replicaId = replicaId;
reqMsg = &req.msg;
}
if (reqMsg == nullptr) {
LOG_DEBUG(_env, "We can't match this address to replica or other leader. Dropping");
return;
}
try {
reqMsg->unpack(msg.buf, _expandedRegistryKey);
} catch (const BincodeException &err) {
LOG_ERROR(_env, "Could not parse LogsDBRequest: %s", err.what());
_logsDBRequests.pop_back();
return;
}
LOG_TRACE(_env, "Received request %s with requests id %s from replica id %s", reqMsg->body.kind(), reqMsg->id, replicaId);
}
uint8_t RegistryServer::_getReplicaId(const IpPort &clientAddress) {
auto replicasPtr = _replicaInfo;
if (!replicasPtr) {
return LogsDB::REPLICA_COUNT;
}
for (ReplicaId replicaId = 0; replicaId.u8 < replicasPtr->size(); ++replicaId.u8) {
if (replicasPtr->at(replicaId.u8).contains(clientAddress)) {
return replicaId.u8;
}
}
return LogsDB::REPLICA_COUNT;
}
void RegistryServer::_sendResponse(int fd, RegistryRespContainer &resp) {
LOG_TRACE(_env, "Sending response to client %s, resp %s", fd, resp);
auto it = _clients.find(fd);
ALWAYS_ASSERT(it != _clients.end());
auto &client = it->second;
ALWAYS_ASSERT(client.writeBuffer.empty());
ALWAYS_ASSERT(client.readBuffer.empty());
ALWAYS_ASSERT(client.messageBytesProcessed == 0);
uint32_t len = resp.packedSize();
client.writeBuffer.resize(len + MESSAGE_HEADER_SIZE);
BincodeBuf buf(client.writeBuffer);
buf.packScalar(REGISTRY_RESP_PROTOCOL_VERSION);
buf.packScalar(len);
resp.pack(buf);
buf.ensureFinished();
client.inFlightRequestId = 0;
_writeClient(fd, true);
}
void RegistryServer::_writeClient(int fd, bool registerEpoll) {
auto it = _clients.find(fd);
ALWAYS_ASSERT(it != _clients.end());
auto &client = it->second;
client.lastActive = ternNow();
ssize_t bytesToWrite = client.writeBuffer.size() - client.messageBytesProcessed;
ssize_t bytesWritten = 0;
LOG_TRACE(_env, "writing to client %s, %s bytes left", fd, bytesToWrite);
while (bytesToWrite > 0 && (bytesWritten = write(fd, &client.writeBuffer[client.messageBytesProcessed], bytesToWrite)) > 0) {
LOG_TRACE(_env, "Sent %s bytes to client", bytesWritten);
client.messageBytesProcessed += bytesWritten;
bytesToWrite -= bytesWritten;
}
LOG_TRACE(_env, "finished writing to client %s, %s bytes left", fd, bytesToWrite);
if (bytesToWrite > 0 && registerEpoll) {
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
ev.data.fd = fd;
if (epoll_ctl(_epollFd, EPOLL_CTL_MOD, fd, &ev) == -1) {
LOG_ERROR(_env, "Failed to modify epoll for client %s", fd);
_removeClient(fd);
return;
}
}
if (bytesToWrite == 0) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
ev.data.fd = fd;
client.messageBytesProcessed = 0;
client.readBuffer.resize(MESSAGE_HEADER_SIZE);
client.writeBuffer.clear();
if (epoll_ctl(_epollFd, EPOLL_CTL_MOD, fd, &ev) == -1) {
LOG_ERROR(_env, "Failed to modify epoll for client %s", fd);
_removeClient(fd);
return;
}
}
}
AddrsInfo* RegistryServer::_addressFromReplicaId(ReplicaId id) {
if (!_replicaInfo) {
return nullptr;
}
auto &addr = (*_replicaInfo)[id.u8];
if (addr[0].port == 0) {
return nullptr;
}
return &addr;
}
void RegistryServer::_packLogsDBResponse(LogsDBResponse &response) {
auto addrInfoPtr = _addressFromReplicaId(response.replicaId);
if (unlikely(addrInfoPtr == nullptr)) {
LOG_TRACE(_env, "No information for replica id %s. dropping response", response.replicaId);
return;
}
auto &addrInfo = *addrInfoPtr;
bool dropArtificially = _packetDropRand.generate64() % 10'000 < _outgoingPacketDropProbability;
if (unlikely(dropArtificially)) {
LOG_TRACE(_env, "artificially dropping response %s", response.msg.id);
return;
}
_sender.prepareOutgoingMessage(_env, _socks[0].addr(), addrInfo,
[&response, this](BincodeBuf &buf) {
response.msg.pack(buf, _expandedRegistryKey);
});
LOG_TRACE(_env, "will send response for req id %s kind %s to %s", response.msg.id, response.msg.body.kind(), addrInfo);
}
void RegistryServer::_packLogsDBRequest(LogsDBRequest &request) {
auto addrInfoPtr = _addressFromReplicaId(request.replicaId);
if (unlikely(addrInfoPtr == nullptr)) {
LOG_TRACE(_env, "No information for replica id %s. dropping request", request.replicaId);
return;
}
auto &addrInfo = *addrInfoPtr;
bool dropArtificially = _packetDropRand.generate64() % 10'000 < _outgoingPacketDropProbability;
if (unlikely(dropArtificially)) {
LOG_TRACE(_env, "artificially dropping request %s", request.msg.id);
return;
}
_sender.prepareOutgoingMessage(_env, _socks[0].addr(), addrInfo,
[&request, this](BincodeBuf &buf) {
request.msg.pack(buf, _expandedRegistryKey);
});
LOG_TRACE(_env, "will send request for req id %s kind %s to %s", request.msg.id, request.msg.body.kind(), addrInfo);
}

View File

@@ -0,0 +1,126 @@
#pragma once
#include "LogsDB.hpp"
#include "MultiplexedChannel.hpp"
#include "Random.hpp"
#include "RegistryCommon.hpp"
#include "RegistryKey.hpp"
static constexpr std::array<uint32_t, 2> RegistryProtocols = {
LOG_REQ_PROTOCOL_VERSION,
LOG_RESP_PROTOCOL_VERSION,
};
using RegistryChannel = MultiplexedChannel<RegistryProtocols.size(), RegistryProtocols>;
constexpr int MAX_RECV_MSGS = 500;
struct RegistryRequest {
uint64_t requestId;
RegistryReqContainer req;
};
struct RegistryResponse {
uint64_t requestId;
RegistryRespContainer resp;
};
class RegistryServer {
public:
RegistryServer(const RegistryOptions &options, Env& env) :
_options(options.serverOptions),
_maxConnections(options.maxConnections),
_env(env),
_socks({UDPSocketPair(_env, _options.addrs)}),
_boundAddresses(_socks[0].addr()),
_lastRequestId(0),
_sender(UDPSenderConfig{.maxMsgSize = MAX_UDP_MTU}),
_packetDropRand(ternNow().ns),
_outgoingPacketDropProbability(_options.simulateOutgoingPacketDrop)
{
_receiver = std::make_unique<UDPReceiver<1>>(UDPReceiverConfig{
.perSockMaxRecvMsg = MAX_RECV_MSGS, .maxMsgSize = MAX_UDP_MTU});
expandKey(RegistryKey, _expandedRegistryKey);
}
virtual ~RegistryServer() = default;
bool init();
inline const AddrsInfo& boundAddresses() const { return _boundAddresses; }
inline void setReplicas(std::shared_ptr<std::array<AddrsInfo, LogsDB::REPLICA_COUNT>> replicaInfo) {
_replicaInfo = replicaInfo;
}
bool receiveMessages(Duration timeout);
inline std::vector<LogsDBRequest>& receivedLogsDBRequests() { return _logsDBRequests; }
inline std::vector<LogsDBResponse>& receivedLogsDBResponses() { return _logsDBResponses; }
inline std::vector<RegistryRequest>& receivedRegistryRequests() { return _receivedRequests; }
void sendLogsDBMessages(std::vector<LogsDBRequest *>& requests, std::vector<LogsDBResponse>& responses);
void sendRegistryResponses(std::vector<RegistryResponse>& responses);
private:
const ServerOptions _options;
uint32_t _maxConnections;
Env& _env;
std::array<UDPSocketPair, 1> _socks; // in an array to play with UDPReceiver<>
const AddrsInfo _boundAddresses;
std::unique_ptr<UDPReceiver<1>> _receiver;
RegistryChannel _channel;
AES128Key _expandedRegistryKey;
std::array<int, 2> _sockFds{-1, -1};
int _epollFd = -1;
static constexpr int MAX_EVENTS = 1024;
std::array<epoll_event, MAX_EVENTS> _events;
struct Client {
int fd;
std::string readBuffer;
std::string writeBuffer;
TernTime lastActive;
size_t messageBytesProcessed;
uint64_t inFlightRequestId;
};
std::unordered_map<int, Client> _clients;
uint64_t _lastRequestId;
std::unordered_map<uint64_t, int> _inFlightRequests; // request to fd mapping
std::vector<RegistryRequest> _receivedRequests;
std::shared_ptr<std::array<AddrsInfo, LogsDB::REPLICA_COUNT>> _replicaInfo;
// log entries buffers
std::vector<LogsDBRequest> _logsDBRequests; // requests from other replicas
std::vector<LogsDBResponse> _logsDBResponses; // responses from other replicas
// outgoing network
UDPSender _sender;
RandomGenerator _packetDropRand;
uint64_t _outgoingPacketDropProbability; // probability * 10,000
void _acceptConnection(int fd);
void _removeClient(int fd);
void _readClient(int fd);
void _writeClient(int fd, bool registerEpoll = false);
uint8_t _getReplicaId(const IpPort &clientAddress);
AddrsInfo* _addressFromReplicaId(ReplicaId id);
void _handleLogsDBResponse(UDPMessage &msg);
void _handleLogsDBRequest(UDPMessage &msg);
void _sendResponse(int fd, RegistryRespContainer &resp);
void _packLogsDBResponse(LogsDBResponse &response);
void _packLogsDBRequest(LogsDBRequest &request);
};

View File

@@ -0,0 +1,67 @@
#pragma once
#include <cstdint>
#include <string>
#include <memory>
#include <optional>
#include "Env.hpp"
#include "Msgs.hpp"
#include "Time.hpp"
#include "XmonAgent.hpp"
#include "Metrics.hpp"
struct RegistryReplicaInfo {
ReplicaId replicaId;
uint8_t location;
std::string host;
uint16_t port;
};
struct RegistryOptions {
// Logging
LogLevel logLevel = LogLevel::LOG_INFO;
std::string logFile = "";
bool syslog = false;
std::string xmonAddr;
std::optional<InfluxDB> influxDB;
// LogsDB settings
std::string dbDir;
bool avoidBeingLeader = true;
bool noReplication = false;
// Registry replication and location
std::string registryPrimaryHost = "";
uint16_t registryPort = 0;
// The second will be used if the port is non-null
AddrsInfo registryAddrs;
ReplicaId replicaId;
uint8_t location;
// Registry specific settings
bool enforceStableIp = false;
bool enforceStableLeader = false;
uint32_t maxConnections = 4000;
Duration staleDelay = 3_mins;
Duration blockServiceUsageDelay = 0_mins;
Duration minDecomInterval = 1_hours;
uint8_t alertAfterUnavailableFailureDomains = 3;
uint32_t maxFailureDomainsPerShard = 28;
Duration writableBlockServiceUpdateInterval = 30_mins;
};
struct RegistryState;
class Registry {
public:
explicit Registry(const RegistryOptions& options, Logger& logger, std::shared_ptr<XmonAgent> xmon);
~Registry();
void start();
void waitUntilStopped();
private:
const RegistryOptions& _options;
Logger& _logger;
std::shared_ptr<XmonAgent> _xmon;
Env _env;
std::unique_ptr<RegistryState> _state;
};

View File

@@ -0,0 +1,159 @@
#include <cstdio>
#include <fcntl.h>
#include <filesystem>
#include <string>
#include "Loop.hpp"
#include "Registry.hpp"
#include "Xmon.hpp"
static bool parseRegistryOptions(CommandLineArgs& args, RegistryOptions& options) {
while(!args.done()) {
if (parseLogOptions(args, options.logOptions) ||
parseXmonOptions(args, options.xmonOptions) ||
parseMetricsOptions(args, options.metricsOptions) ||
parseRegistryClientOptions(args, options.registryClientOptions) ||
parseLogsDBOptions(args, options.logsDBOptions) ||
parseServerOptions(args, options.serverOptions)
) {
continue;
}
std::string arg = args.peekArg();
if (arg == "-enforce-stable-ip") {
args.next();
options.enforceStableIp = true;
continue;
}
if (arg == "-enforce-stable-leader") {
args.next();
options.enforceStableLeader = true;
continue;
}
if (arg == "-max-connections") {
options.maxConnections = parseUint32(args.next());
continue;
}
if (arg == "-min-auto-decom-interval") {
options.minDecomInterval = parseDuration(args.next());
continue;
}
if (arg == "-alert-at-unavailable-failure-domains") {
options.alertAfterUnavailableFailureDomains = parseUint8(args.next());
continue;
}
if (arg == "-staleness-delay") {
options.staleDelay = parseDuration(args.next());
continue;
}
if (arg == "-block-service-use-delay") {
options.blockServiceUsageDelay = parseDuration(args.next());
continue;
}
if (arg == "-max-writable-block-service-per-shard") {
options.maxFailureDomainsPerShard = parseUint32(args.next());
continue;
}
if (arg == "-writable-block-service-update-interval") {
options.writableBlockServiceUpdateInterval = parseDuration(args.next());
continue;
}
fprintf(stderr, "unknown argument %s\n", args.peekArg().c_str());
return false;
}
return true;
}
static void printRegistryOptionsUsage() {
printLogOptionsUsage();
printXmonOptionsUsage();
printMetricsOptionsUsage();
printRegistryClientOptionsUsage();
printLogsDBOptionsUsage();
printServerOptionsUsage();
fprintf(stderr, "RegistryOptions:\n");
fprintf(stderr, " -enforce-stable-ip\n");
fprintf(stderr, " Don't allow both ip addresses to change at once on any service hearbeat.\n");
fprintf(stderr, " -enforce-stable-leader\n");
fprintf(stderr, " Don't allow leader to implicitly change on heartbeat but require LeaderMoveReq.\n");
fprintf(stderr, " -max-connections\n");
fprintf(stderr, " Maximum number of connections to serve at the same time. Default is 4000\n");
fprintf(stderr, " -min-auto-decom-interval\n");
fprintf(stderr, " Minimum time between auto-decomissions for same path-prefix. Default 1 hour\n");
fprintf(stderr, " -alert-at-unavailable-failure-domains\n");
fprintf(stderr, " Alert and prevent auto-decomission after errors spread number of failure domains. Default 3\n");
fprintf(stderr, " -staleness-delay\n");
fprintf(stderr, " How long it needs to pass without heartbeat before a service is declared stale. Default is 3 min\n");
fprintf(stderr, " -block-service-use-delay\n");
fprintf(stderr, " How long to wait after seeing block services for the first time before using it. Default is 0\n");
fprintf(stderr, " -max-writable-block-service-per-shard\n");
fprintf(stderr, " Maximum number of block services to assign to a shard for writting at any given time. Default is 28\n");
fprintf(stderr, " -writable-block-service-update-interval\n");
fprintf(stderr, " Maximum interval at which to change writable services assigned to shards. Default 30 min\n");
}
static bool validateRegistryOptions(const RegistryOptions& options) {
return (validateLogOptions(options.logOptions) &&
validateXmonOptions(options.xmonOptions) &&
validateMetricsOptions(options.metricsOptions) &&
validateRegistryClientOptions(options.registryClientOptions) &&
validateLogsDBOptions(options.logsDBOptions) &&
validateServerOptions(options.serverOptions)
);
}
static void usage(const char* binary) {
fprintf(stderr, "Usage: %s \n\n", binary);
printRegistryOptionsUsage();
}
int main(int argc, char** argv) {
namespace fs = std::filesystem;
RegistryOptions options;
CommandLineArgs args(argc, argv, usage);
if (!(parseRegistryOptions(args, options) && validateRegistryOptions(options))) {
args.dieWithUsage();
}
fs::path dbDir(options.logsDBOptions.dbDir);
{
std::error_code err;
if (!fs::create_directory(dbDir, err) && err.value() != 0) {
throw EXPLICIT_SYSCALL_EXCEPTION(err.value(), "mkdir");
}
}
int logOutFd = STDOUT_FILENO;
if (!options.logOptions.logFile.empty()) {
logOutFd = open(options.logOptions.logFile.c_str(), O_WRONLY|O_CREAT|O_APPEND, 0644);
if (logOutFd < 0) {
throw SYSCALL_EXCEPTION("open");
}
}
Logger logger(options.logOptions.logLevel, logOutFd, options.logOptions.syslog, true);
LoopThreads threads;
// Immediately start xmon: we want the init alerts to be there
std::shared_ptr<XmonAgent> xmon;
if (!options.xmonOptions.addr.empty()) {
xmon = std::make_shared<XmonAgent>();
std::ostringstream ss;
ss << "registry_" << options.logsDBOptions.replicaId;
options.xmonOptions.appInstance = ss.str();
options.xmonOptions.appType = XmonAppType::CRITICAL;
threads.emplace_back(LoopThread::Spawn(std::make_unique<Xmon>(logger, xmon, options.xmonOptions)));
}
Env env(logger, xmon, "startup");
Registry regi(logger, xmon);
LOG_INFO(env, "starting Registry");
regi.start(options, threads);
LOG_INFO(env, "waiting for Registry to stop");
// from this point on termination on SIGINT/SIGTERM will be graceful
LoopThread::waitUntilStopped(threads);
regi.close();
LOG_INFO(env, "registry stopped. Exiting");
return 0;
}

View File

@@ -1577,3 +1577,7 @@ func (c *Client) GetFailureDomainForBlockService(blockServiceId msgs.BlockServic
failureDomain, ok := c.blockServiceToFailureDomain[blockServiceId]
return failureDomain, ok
}
func (c *Client) RegistryRequest(logger *log.Logger, reqBody msgs.RegistryRequest) (msgs.RegistryResponse, error) {
return c.registryConn.Request(reqBody)
}

View File

@@ -389,22 +389,24 @@ func (procs *ManagedProcesses) StartFuse(ll *log.Logger, opts *FuseOpts) string
type RegistryOpts struct {
Exe string
Dir string
Replica msgs.ReplicaId
LogLevel log.LogLevel
HttpPort uint16
Stale time.Duration
Xmon string
ScriptsJs string
RegistryAddress string
Addr1 string
Addr2 string
LogsDBFlags []string
}
func (procs *ManagedProcesses) StartRegistry(ll *log.Logger, opts *RegistryOpts) {
createDataDir(opts.Dir)
args := []string{
"-http-port", fmt.Sprintf("%d", opts.HttpPort),
"-log-file", path.Join(opts.Dir, "log"),
"-data-dir", opts.Dir,
"-db-dir", opts.Dir,
"-addr", opts.Addr1,
"-replica", fmt.Sprintf("%d", opts.Replica),
"-registry", opts.RegistryAddress,
}
if opts.LogLevel == log.DEBUG {
args = append(args, "-verbose")
@@ -413,17 +415,17 @@ func (procs *ManagedProcesses) StartRegistry(ll *log.Logger, opts *RegistryOpts)
args = append(args, "-trace")
}
if opts.Stale != 0 {
args = append(args, "-stale", opts.Stale.String())
args = append(args, "-staleness-delay", fmt.Sprintf("%dms", opts.Stale.Milliseconds()))
}
if opts.Xmon != "" {
args = append(args, "-xmon", opts.Xmon)
}
if opts.ScriptsJs != "" {
args = append(args, "-scripts-js", opts.ScriptsJs)
}
if opts.Addr2 != "" {
args = append(args, "-addr", opts.Addr2)
}
if opts.LogsDBFlags != nil {
args = append(args, opts.LogsDBFlags...)
}
procs.Start(ll, &ManagedProcessArgs{
Name: "registry",
Exe: opts.Exe,
@@ -476,14 +478,13 @@ func (procs *ManagedProcesses) StartRegistryProxy(ll *log.Logger, opts *Registry
}
type GoExes struct {
RegistryExe string
BlocksExe string
FuseExe string
RegistryProxyExe string
}
func BuildGoExes(ll *log.Logger, repoDir string, race bool) *GoExes {
args := []string{"ternweb", "ternblocks", "ternfuse"}
args := []string{"ternblocks", "ternfuse"}
if race {
args = append(args, "--race")
}
@@ -496,7 +497,6 @@ func BuildGoExes(ll *log.Logger, repoDir string, race bool) *GoExes {
panic(fmt.Errorf("could not build shucke/blocks/fuse: %w", err))
}
return &GoExes{
RegistryExe: path.Join(goDir(repoDir), "ternweb", "ternweb"),
BlocksExe: path.Join(goDir(repoDir), "ternblocks", "ternblocks"),
FuseExe: path.Join(goDir(repoDir), "ternfuse", "ternfuse"),
RegistryProxyExe: path.Join(goDir(repoDir), "ternregistryproxy", "ternregistryproxy"),
@@ -710,14 +710,16 @@ func buildCpp(ll *log.Logger, repoDir string, buildType string, targets []string
}
type CppExes struct {
RegistryExe string
ShardExe string
CDCExe string
DBToolsExe string
}
func BuildCppExes(ll *log.Logger, repoDir string, buildType string) *CppExes {
buildDir := buildCpp(ll, repoDir, buildType, []string{"shard/ternshard", "cdc/terncdc", "dbtools/terndbtools"})
buildDir := buildCpp(ll, repoDir, buildType, []string{"registry/ternregistry", "shard/ternshard", "cdc/terncdc", "dbtools/terndbtools"})
return &CppExes{
RegistryExe: path.Join(buildDir, "registry/ternregistry"),
ShardExe: path.Join(buildDir, "shard/ternshard"),
CDCExe: path.Join(buildDir, "cdc/terncdc"),
DBToolsExe: path.Join(buildDir, "dbtools/terndbtools"),

View File

@@ -33,11 +33,10 @@ func main() {
profile := flag.Bool("profile", false, "Whether to run code (both Go and C++) with profiling.")
registryBincodePort := flag.Uint("registry-bincode-port", 10001, "")
registryHttpPort := flag.Uint("registry-http-port", 10000, "")
startingPort := flag.Uint("start-port", 10003, "The services will be assigned port in this order, CDC, shard_000, ..., shard_255, bs_0, ..., bs_n. If 0, ports will be chosen randomly.")
startingPort := flag.Uint("start-port", 10010, "The services will be assigned port in this order, CDC, shard_000, ..., shard_255, bs_0, ..., bs_n. If 0, ports will be chosen randomly.")
repoDir := flag.String("repo-dir", "", "Used to build C++/Go binaries. If not provided, the path will be derived form the filename at build time (so will only work locally).")
binariesDir := flag.String("binaries-dir", "", "If provided, nothing will be built, instead it'll be assumed that the binaries will be in the specified directory.")
xmon := flag.String("xmon", "", "")
registryScriptsJs := flag.String("registry-scripts-js", "", "")
noFuse := flag.Bool("no-fuse", false, "")
leaderOnly := flag.Bool("leader-only", false, "Run only LogsDB leader with LEADER_NO_FOLLOWERS")
multiLocation := flag.Bool("multi-location", false, "Run 2 sets of shards/registry/cdc/storages to simulate multi data centre setup")
@@ -103,11 +102,11 @@ func main() {
var goExes *managedprocess.GoExes
if *binariesDir != "" {
cppExes = &managedprocess.CppExes{
RegistryExe: path.Join(*binariesDir, "ternregistry"),
ShardExe: path.Join(*binariesDir, "ternshard"),
CDCExe: path.Join(*binariesDir, "terncdc"),
}
goExes = &managedprocess.GoExes{
RegistryExe: path.Join(*binariesDir, "ternweb"),
BlocksExe: path.Join(*binariesDir, "ternblocks"),
FuseExe: path.Join(*binariesDir, "ternfuse"),
RegistryProxyExe: path.Join(*binariesDir, "ternregistryproxy"),
@@ -125,28 +124,50 @@ func main() {
fmt.Printf("starting components\n")
replicaCount := uint(5)
if *leaderOnly {
replicaCount = 1
}
// Start registry
registryAddress := fmt.Sprintf("127.0.0.1:%v", *registryBincodePort)
procs.StartRegistry(l, &managedprocess.RegistryOpts{
Exe: goExes.RegistryExe,
HttpPort: uint16(*registryHttpPort),
{
for r := uint8(0); r < uint8(replicaCount); r++ {
dir := path.Join(*dataDir, fmt.Sprintf("registry_%d", r))
if r == 0 {
dir = path.Join(*dataDir, "registry")
}
opts := managedprocess.RegistryOpts{
Exe: cppExes.RegistryExe,
LogLevel: level,
Dir: path.Join(*dataDir, "registry"),
Dir: dir,
RegistryAddress: registryAddress,
Replica: msgs.ReplicaId(r),
Xmon: *xmon,
ScriptsJs: *registryScriptsJs,
Addr1: registryAddress,
})
}
if r == 0 {
if *leaderOnly {
opts.LogsDBFlags = []string{"-logsdb-leader", "-logsdb-no-replication"}
} else {
opts.LogsDBFlags = []string{"-logsdb-leader"}
}
}
opts.Addr1 = fmt.Sprintf("127.0.0.1:%v", uint16(*registryBincodePort)+uint16(r))
procs.StartRegistry(l, &opts)
}
}
registryProxyPort := *registryBincodePort + 1
registryProxyAddress := fmt.Sprintf("127.0.0.1:%v", registryProxyPort)
numLocations := 1
if *multiLocation {
// Waiting for registry
err := client.WaitForRegistry(l, registryAddress, 10*time.Second)
if err != nil {
panic(fmt.Errorf("failed to connect to registry %v", err))
}
registryProxyPort := *registryBincodePort + replicaCount
registryProxyAddress := fmt.Sprintf("127.0.0.1:%v", registryProxyPort)
numLocations := 1
if *multiLocation {
_, err = client.RegistryRequest(l, nil, registryAddress, &msgs.CreateLocationReq{1, "location1"})
if err != nil {
// it's possible location already exits, try renaming it
@@ -166,11 +187,11 @@ func main() {
Location: 1,
},
)
numLocations = 2
err = client.WaitForRegistry(l, registryProxyAddress, 10*time.Second)
if err != nil {
panic(fmt.Errorf("failed to connect to registry %v", err))
}
replicaCount := 5
if *leaderOnly {
replicaCount = 1
numLocations = 2
}
// Start block services
@@ -323,8 +344,8 @@ func main() {
fmt.Printf("operational\n")
}
err := <-terminateChan
if err != nil {
panic(err)
errT := <-terminateChan
if errT != nil {
panic(errT)
}
}

View File

@@ -957,12 +957,12 @@ func main() {
var goExes *managedprocess.GoExes
if *binariesDir != "" {
cppExes = &managedprocess.CppExes{
RegistryExe: path.Join(*binariesDir, "ternregistry"),
ShardExe: path.Join(*binariesDir, "ternshard"),
CDCExe: path.Join(*binariesDir, "terncdc"),
DBToolsExe: path.Join(*binariesDir, "terndbtools"),
}
goExes = &managedprocess.GoExes{
RegistryExe: path.Join(*binariesDir, "ternweb"),
BlocksExe: path.Join(*binariesDir, "ternblocks"),
FuseExe: path.Join(*binariesDir, "ternfuse"),
}
@@ -1037,17 +1037,38 @@ func main() {
registryPort := uint16(*registryPortArg)
registryAddress := fmt.Sprintf("127.0.0.1:%v", registryPort)
// Start registry
registryOpts := &managedprocess.RegistryOpts{
Exe: goExes.RegistryExe,
{
for r := uint8(0); r < uint8(5); r++ {
dir := path.Join(*dataDir, fmt.Sprintf("registry_%d", r))
if r == 0 {
dir = path.Join(*dataDir, "registry")
}
opts := managedprocess.RegistryOpts{
Exe: cppExes.RegistryExe,
LogLevel: level,
Dir: path.Join(*dataDir, "registry"),
Addr1: registryAddress,
Dir: dir,
RegistryAddress: registryAddress,
Replica: msgs.ReplicaId(r),
}
if r == 0 {
if *leaderOnly {
opts.LogsDBFlags = []string{"-logsdb-leader", "-logsdb-no-replication"}
} else {
opts.LogsDBFlags = []string{"-logsdb-leader"}
}
}
opts.Addr1 = fmt.Sprintf("127.0.0.1:%v", registryPort+uint16(r))
if *blockServiceKiller {
registryOpts.Stale = time.Hour * 1000 // never, so that we stimulate the clients ability to fallback
opts.Stale = time.Hour * 1000 // never, so that we stimulate the clients ability to fallback
}
procs.StartRegistry(l, &opts)
}
}
err := client.WaitForRegistry(l, registryAddress, 10*time.Second)
if err != nil {
panic(fmt.Errorf("failed to connect to registry %v", err))
}
procs.StartRegistry(l, registryOpts)
failureDomains := 14 + 4 // so that any 4 can fail and we can still do everything.
hddBlockServices := 10
@@ -1279,9 +1300,9 @@ func main() {
}()
// wait for things to finish
err := <-terminateChan
if err != nil {
panic(err)
errT := <-terminateChan
if errT != nil {
panic(errT)
}
// fsck everything

File diff suppressed because it is too large Load Diff

View File

@@ -15,7 +15,7 @@ def build_and_upload(build_type: str) -> None:
test_binaries = [
f"build/{build_type}/ternshard",
f"build/{build_type}/terncdc",
f"build/{build_type}/ternweb",
f"build/{build_type}/ternregistry",
f"build/{build_type}/ternrun",
f"build/{build_type}/ternblocks",
f"build/{build_type}/ternfuse",