shard: proxy read/write

This commit is contained in:
Miroslav Crnic
2024-07-23 08:31:45 +00:00
parent 6915c4fac4
commit 1a47089b3d
19 changed files with 1234 additions and 760 deletions
+4 -4
View File
@@ -221,7 +221,7 @@ private:
// We receive everything at once, but we send stuff from
// separate threads.
UDPReceiver<2> _receiver;
MultiplexedChannel<4, std::array<uint32_t, 4>{CDC_REQ_PROTOCOL_VERSION, SHARD_CHECK_POINTED_RESP_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}> _channel;
MultiplexedChannel<4, std::array<uint32_t, 4>{CDC_REQ_PROTOCOL_VERSION, CDC_TO_SHARD_RESP_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}> _channel;
UDPSender _cdcSender;
UDPSender _shardSender;
@@ -691,10 +691,10 @@ private:
}
void _processShardMessages() {
for (auto& msg : _channel.protocolMessages(SHARD_CHECK_POINTED_RESP_PROTOCOL_VERSION)) {
for (auto& msg : _channel.protocolMessages(CDC_TO_SHARD_RESP_PROTOCOL_VERSION)) {
LOG_DEBUG(_env, "received response from shard");
ShardCheckPointedRespMsg respMsg;
CdcToShardRespMsg respMsg;
try {
respMsg.unpack(msg.buf, _expandedCDCKey);
} catch (BincodeException err) {
@@ -765,7 +765,7 @@ private:
for (const auto& [txnId, shardReq]: _step.runningTxns) {
CDCShardReq prevReq;
LOG_TRACE(_env, "txn %s needs shard %s, req %s", txnId, shardReq.shid, shardReq.req);
ShardCheckPointedReqMsg shardReqMsg;
CdcToShardReqMsg shardReqMsg;
// Do not allocate new req id for repeated requests, so that we'll just accept
// the first one that comes back. There's a chance for the txnId to not be here
+23
View File
@@ -254,6 +254,21 @@ public:
return EggsError::NO_ERROR;
}
void readIndexedEntries(const std::vector<LogIdx>& indices, std::vector<LogsDBLogEntry>& entries) const {
entries.clear();
if (indices.empty()) {
return;
}
// TODO: This is not very efficient as we're doing a lookup for each index.
entries.reserve(indices.size());
for (auto idx : indices) {
LogsDBLogEntry& entry = entries.emplace_back();
if (readLogEntry(idx, entry) != EggsError::NO_ERROR) {
entry.idx = 0;
}
}
}
void writeLogEntries(const std::vector<LogsDBLogEntry>& entries) {
_maybeRotate();
@@ -1744,6 +1759,10 @@ public:
_catchupReader.readEntries(entries, maxEntries);
}
void readIndexedEntries(const std::vector<LogIdx> &indices, std::vector<LogsDBLogEntry> &entries) const {
_partitions.readIndexedEntries(indices, entries);
}
Duration getNextTimeout() const {
return _reqResp.getNextTimeout();
}
@@ -1825,6 +1844,10 @@ void LogsDB::readEntries(std::vector<LogsDBLogEntry>& entries, size_t maxEntries
_impl->readEntries(entries, maxEntries);
}
void LogsDB::readIndexedEntries(const std::vector<LogIdx> &indices, std::vector<LogsDBLogEntry> &entries) const {
_impl->readIndexedEntries(indices, entries);
}
Duration LogsDB::getNextTimeout() const {
return _impl->getNextTimeout();
}
+4
View File
@@ -126,6 +126,10 @@ public:
void readEntries(std::vector<LogsDBLogEntry>& entries, size_t maxEntries = IN_FLIGHT_APPEND_WINDOW);
// Takes a sorted vector of log inxices and returns the corresponding entries
// If an entry is missing and empty entry with LogIdx 0 is returned
void readIndexedEntries(const std::vector<LogIdx>& indices, std::vector<LogsDBLogEntry>& entries) const;
Duration getNextTimeout() const;
LogIdx getLastReleased() const;
+120 -209
View File
@@ -487,9 +487,6 @@ std::ostream& operator<<(std::ostream& out, ShuckleMessageKind kind) {
case ShuckleMessageKind::CDC_AT_LOCATION:
out << "CDC_AT_LOCATION";
break;
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
out << "SHARD_REPLICAS_DE_PR_EC_AT_ED";
break;
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
out << "SHARD_BLOCK_SERVICES";
break;
@@ -4184,42 +4181,6 @@ std::ostream& operator<<(std::ostream& out, const CdcAtLocationResp& x) {
return out;
}
void ShardReplicasDEPRECATEDReq::pack(BincodeBuf& buf) const {
id.pack(buf);
}
void ShardReplicasDEPRECATEDReq::unpack(BincodeBuf& buf) {
id.unpack(buf);
}
void ShardReplicasDEPRECATEDReq::clear() {
id = ShardId();
}
bool ShardReplicasDEPRECATEDReq::operator==(const ShardReplicasDEPRECATEDReq& rhs) const {
if ((ShardId)this->id != (ShardId)rhs.id) { return false; };
return true;
}
std::ostream& operator<<(std::ostream& out, const ShardReplicasDEPRECATEDReq& x) {
out << "ShardReplicasDEPRECATEDReq(" << "Id=" << x.id << ")";
return out;
}
void ShardReplicasDEPRECATEDResp::pack(BincodeBuf& buf) const {
buf.packList<AddrsInfo>(replicas);
}
void ShardReplicasDEPRECATEDResp::unpack(BincodeBuf& buf) {
buf.unpackList<AddrsInfo>(replicas);
}
void ShardReplicasDEPRECATEDResp::clear() {
replicas.clear();
}
bool ShardReplicasDEPRECATEDResp::operator==(const ShardReplicasDEPRECATEDResp& rhs) const {
if (replicas != rhs.replicas) { return false; };
return true;
}
std::ostream& operator<<(std::ostream& out, const ShardReplicasDEPRECATEDResp& x) {
out << "ShardReplicasDEPRECATEDResp(" << "Replicas=" << x.replicas << ")";
return out;
}
void ShardBlockServicesReq::pack(BincodeBuf& buf) const {
shardId.pack(buf);
}
@@ -8039,121 +8000,112 @@ CdcAtLocationReq& ShuckleReqContainer::setCdcAtLocation() {
auto& x = _data.emplace<14>();
return x;
}
const ShardReplicasDEPRECATEDReq& ShuckleReqContainer::getShardReplicasDEPRECATED() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED, "%s != %s", _kind, ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED);
return std::get<15>(_data);
}
ShardReplicasDEPRECATEDReq& ShuckleReqContainer::setShardReplicasDEPRECATED() {
_kind = ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED;
auto& x = _data.emplace<15>();
return x;
}
const ShardBlockServicesReq& ShuckleReqContainer::getShardBlockServices() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::SHARD_BLOCK_SERVICES, "%s != %s", _kind, ShuckleMessageKind::SHARD_BLOCK_SERVICES);
return std::get<16>(_data);
return std::get<15>(_data);
}
ShardBlockServicesReq& ShuckleReqContainer::setShardBlockServices() {
_kind = ShuckleMessageKind::SHARD_BLOCK_SERVICES;
auto& x = _data.emplace<16>();
auto& x = _data.emplace<15>();
return x;
}
const CdcReplicasDEPRECATEDReq& ShuckleReqContainer::getCdcReplicasDEPRECATED() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED, "%s != %s", _kind, ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED);
return std::get<17>(_data);
return std::get<16>(_data);
}
CdcReplicasDEPRECATEDReq& ShuckleReqContainer::setCdcReplicasDEPRECATED() {
_kind = ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED;
auto& x = _data.emplace<17>();
auto& x = _data.emplace<16>();
return x;
}
const AllShardsReq& ShuckleReqContainer::getAllShards() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::ALL_SHARDS, "%s != %s", _kind, ShuckleMessageKind::ALL_SHARDS);
return std::get<18>(_data);
return std::get<17>(_data);
}
AllShardsReq& ShuckleReqContainer::setAllShards() {
_kind = ShuckleMessageKind::ALL_SHARDS;
auto& x = _data.emplace<18>();
auto& x = _data.emplace<17>();
return x;
}
const DecommissionBlockServiceReq& ShuckleReqContainer::getDecommissionBlockService() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE, "%s != %s", _kind, ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE);
return std::get<19>(_data);
return std::get<18>(_data);
}
DecommissionBlockServiceReq& ShuckleReqContainer::setDecommissionBlockService() {
_kind = ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE;
auto& x = _data.emplace<19>();
auto& x = _data.emplace<18>();
return x;
}
const MoveShardLeaderReq& ShuckleReqContainer::getMoveShardLeader() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::MOVE_SHARD_LEADER, "%s != %s", _kind, ShuckleMessageKind::MOVE_SHARD_LEADER);
return std::get<20>(_data);
return std::get<19>(_data);
}
MoveShardLeaderReq& ShuckleReqContainer::setMoveShardLeader() {
_kind = ShuckleMessageKind::MOVE_SHARD_LEADER;
auto& x = _data.emplace<20>();
auto& x = _data.emplace<19>();
return x;
}
const ClearShardInfoReq& ShuckleReqContainer::getClearShardInfo() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::CLEAR_SHARD_INFO, "%s != %s", _kind, ShuckleMessageKind::CLEAR_SHARD_INFO);
return std::get<21>(_data);
return std::get<20>(_data);
}
ClearShardInfoReq& ShuckleReqContainer::setClearShardInfo() {
_kind = ShuckleMessageKind::CLEAR_SHARD_INFO;
auto& x = _data.emplace<21>();
auto& x = _data.emplace<20>();
return x;
}
const RegisterBlockServicesDEPRECATEDReq& ShuckleReqContainer::getRegisterBlockServicesDEPRECATED() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED, "%s != %s", _kind, ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED);
return std::get<22>(_data);
return std::get<21>(_data);
}
RegisterBlockServicesDEPRECATEDReq& ShuckleReqContainer::setRegisterBlockServicesDEPRECATED() {
_kind = ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED;
auto& x = _data.emplace<22>();
auto& x = _data.emplace<21>();
return x;
}
const AllCdcReq& ShuckleReqContainer::getAllCdc() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::ALL_CDC, "%s != %s", _kind, ShuckleMessageKind::ALL_CDC);
return std::get<23>(_data);
return std::get<22>(_data);
}
AllCdcReq& ShuckleReqContainer::setAllCdc() {
_kind = ShuckleMessageKind::ALL_CDC;
auto& x = _data.emplace<23>();
auto& x = _data.emplace<22>();
return x;
}
const EraseDecommissionedBlockReq& ShuckleReqContainer::getEraseDecommissionedBlock() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK, "%s != %s", _kind, ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK);
return std::get<24>(_data);
return std::get<23>(_data);
}
EraseDecommissionedBlockReq& ShuckleReqContainer::setEraseDecommissionedBlock() {
_kind = ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK;
auto& x = _data.emplace<24>();
auto& x = _data.emplace<23>();
return x;
}
const AllBlockServicesReq& ShuckleReqContainer::getAllBlockServices() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::ALL_BLOCK_SERVICES, "%s != %s", _kind, ShuckleMessageKind::ALL_BLOCK_SERVICES);
return std::get<25>(_data);
return std::get<24>(_data);
}
AllBlockServicesReq& ShuckleReqContainer::setAllBlockServices() {
_kind = ShuckleMessageKind::ALL_BLOCK_SERVICES;
auto& x = _data.emplace<25>();
auto& x = _data.emplace<24>();
return x;
}
const MoveCdcLeaderReq& ShuckleReqContainer::getMoveCdcLeader() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::MOVE_CDC_LEADER, "%s != %s", _kind, ShuckleMessageKind::MOVE_CDC_LEADER);
return std::get<26>(_data);
return std::get<25>(_data);
}
MoveCdcLeaderReq& ShuckleReqContainer::setMoveCdcLeader() {
_kind = ShuckleMessageKind::MOVE_CDC_LEADER;
auto& x = _data.emplace<26>();
auto& x = _data.emplace<25>();
return x;
}
const ClearCdcInfoReq& ShuckleReqContainer::getClearCdcInfo() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::CLEAR_CDC_INFO, "%s != %s", _kind, ShuckleMessageKind::CLEAR_CDC_INFO);
return std::get<27>(_data);
return std::get<26>(_data);
}
ClearCdcInfoReq& ShuckleReqContainer::setClearCdcInfo() {
_kind = ShuckleMessageKind::CLEAR_CDC_INFO;
auto& x = _data.emplace<27>();
auto& x = _data.emplace<26>();
return x;
}
ShuckleReqContainer::ShuckleReqContainer() {
@@ -8218,9 +8170,6 @@ void ShuckleReqContainer::operator=(const ShuckleReqContainer& other) {
case ShuckleMessageKind::CDC_AT_LOCATION:
setCdcAtLocation() = other.getCdcAtLocation();
break;
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
setShardReplicasDEPRECATED() = other.getShardReplicasDEPRECATED();
break;
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
setShardBlockServices() = other.getShardBlockServices();
break;
@@ -8300,32 +8249,30 @@ size_t ShuckleReqContainer::packedSize() const {
return sizeof(ShuckleMessageKind) + std::get<13>(_data).packedSize();
case ShuckleMessageKind::CDC_AT_LOCATION:
return sizeof(ShuckleMessageKind) + std::get<14>(_data).packedSize();
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
return sizeof(ShuckleMessageKind) + std::get<15>(_data).packedSize();
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
return sizeof(ShuckleMessageKind) + std::get<16>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<15>(_data).packedSize();
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
return sizeof(ShuckleMessageKind) + std::get<17>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<16>(_data).packedSize();
case ShuckleMessageKind::ALL_SHARDS:
return sizeof(ShuckleMessageKind) + std::get<18>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<17>(_data).packedSize();
case ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE:
return sizeof(ShuckleMessageKind) + std::get<19>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<18>(_data).packedSize();
case ShuckleMessageKind::MOVE_SHARD_LEADER:
return sizeof(ShuckleMessageKind) + std::get<20>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<19>(_data).packedSize();
case ShuckleMessageKind::CLEAR_SHARD_INFO:
return sizeof(ShuckleMessageKind) + std::get<21>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<20>(_data).packedSize();
case ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED:
return sizeof(ShuckleMessageKind) + std::get<22>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<21>(_data).packedSize();
case ShuckleMessageKind::ALL_CDC:
return sizeof(ShuckleMessageKind) + std::get<23>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<22>(_data).packedSize();
case ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK:
return sizeof(ShuckleMessageKind) + std::get<24>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<23>(_data).packedSize();
case ShuckleMessageKind::ALL_BLOCK_SERVICES:
return sizeof(ShuckleMessageKind) + std::get<25>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<24>(_data).packedSize();
case ShuckleMessageKind::MOVE_CDC_LEADER:
return sizeof(ShuckleMessageKind) + std::get<26>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<25>(_data).packedSize();
case ShuckleMessageKind::CLEAR_CDC_INFO:
return sizeof(ShuckleMessageKind) + std::get<27>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<26>(_data).packedSize();
default:
throw EGGS_EXCEPTION("bad ShuckleMessageKind kind %s", _kind);
}
@@ -8379,44 +8326,41 @@ void ShuckleReqContainer::pack(BincodeBuf& buf) const {
case ShuckleMessageKind::CDC_AT_LOCATION:
std::get<14>(_data).pack(buf);
break;
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
std::get<15>(_data).pack(buf);
break;
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
std::get<16>(_data).pack(buf);
break;
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
case ShuckleMessageKind::ALL_SHARDS:
std::get<17>(_data).pack(buf);
break;
case ShuckleMessageKind::ALL_SHARDS:
case ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE:
std::get<18>(_data).pack(buf);
break;
case ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE:
case ShuckleMessageKind::MOVE_SHARD_LEADER:
std::get<19>(_data).pack(buf);
break;
case ShuckleMessageKind::MOVE_SHARD_LEADER:
case ShuckleMessageKind::CLEAR_SHARD_INFO:
std::get<20>(_data).pack(buf);
break;
case ShuckleMessageKind::CLEAR_SHARD_INFO:
case ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED:
std::get<21>(_data).pack(buf);
break;
case ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED:
case ShuckleMessageKind::ALL_CDC:
std::get<22>(_data).pack(buf);
break;
case ShuckleMessageKind::ALL_CDC:
case ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK:
std::get<23>(_data).pack(buf);
break;
case ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK:
case ShuckleMessageKind::ALL_BLOCK_SERVICES:
std::get<24>(_data).pack(buf);
break;
case ShuckleMessageKind::ALL_BLOCK_SERVICES:
case ShuckleMessageKind::MOVE_CDC_LEADER:
std::get<25>(_data).pack(buf);
break;
case ShuckleMessageKind::MOVE_CDC_LEADER:
std::get<26>(_data).pack(buf);
break;
case ShuckleMessageKind::CLEAR_CDC_INFO:
std::get<27>(_data).pack(buf);
std::get<26>(_data).pack(buf);
break;
default:
throw EGGS_EXCEPTION("bad ShuckleMessageKind kind %s", _kind);
@@ -8471,44 +8415,41 @@ void ShuckleReqContainer::unpack(BincodeBuf& buf) {
case ShuckleMessageKind::CDC_AT_LOCATION:
_data.emplace<14>().unpack(buf);
break;
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
_data.emplace<15>().unpack(buf);
break;
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
_data.emplace<16>().unpack(buf);
break;
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
case ShuckleMessageKind::ALL_SHARDS:
_data.emplace<17>().unpack(buf);
break;
case ShuckleMessageKind::ALL_SHARDS:
case ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE:
_data.emplace<18>().unpack(buf);
break;
case ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE:
case ShuckleMessageKind::MOVE_SHARD_LEADER:
_data.emplace<19>().unpack(buf);
break;
case ShuckleMessageKind::MOVE_SHARD_LEADER:
case ShuckleMessageKind::CLEAR_SHARD_INFO:
_data.emplace<20>().unpack(buf);
break;
case ShuckleMessageKind::CLEAR_SHARD_INFO:
case ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED:
_data.emplace<21>().unpack(buf);
break;
case ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED:
case ShuckleMessageKind::ALL_CDC:
_data.emplace<22>().unpack(buf);
break;
case ShuckleMessageKind::ALL_CDC:
case ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK:
_data.emplace<23>().unpack(buf);
break;
case ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK:
case ShuckleMessageKind::ALL_BLOCK_SERVICES:
_data.emplace<24>().unpack(buf);
break;
case ShuckleMessageKind::ALL_BLOCK_SERVICES:
case ShuckleMessageKind::MOVE_CDC_LEADER:
_data.emplace<25>().unpack(buf);
break;
case ShuckleMessageKind::MOVE_CDC_LEADER:
_data.emplace<26>().unpack(buf);
break;
case ShuckleMessageKind::CLEAR_CDC_INFO:
_data.emplace<27>().unpack(buf);
_data.emplace<26>().unpack(buf);
break;
default:
throw BINCODE_EXCEPTION("bad ShuckleMessageKind kind %s", _kind);
@@ -8549,8 +8490,6 @@ bool ShuckleReqContainer::operator==(const ShuckleReqContainer& other) const {
return getShardsAtLocation() == other.getShardsAtLocation();
case ShuckleMessageKind::CDC_AT_LOCATION:
return getCdcAtLocation() == other.getCdcAtLocation();
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
return getShardReplicasDEPRECATED() == other.getShardReplicasDEPRECATED();
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
return getShardBlockServices() == other.getShardBlockServices();
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
@@ -8627,9 +8566,6 @@ std::ostream& operator<<(std::ostream& out, const ShuckleReqContainer& x) {
case ShuckleMessageKind::CDC_AT_LOCATION:
out << x.getCdcAtLocation();
break;
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
out << x.getShardReplicasDEPRECATED();
break;
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
out << x.getShardBlockServices();
break;
@@ -8819,121 +8755,112 @@ CdcAtLocationResp& ShuckleRespContainer::setCdcAtLocation() {
auto& x = _data.emplace<15>();
return x;
}
const ShardReplicasDEPRECATEDResp& ShuckleRespContainer::getShardReplicasDEPRECATED() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED, "%s != %s", _kind, ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED);
return std::get<16>(_data);
}
ShardReplicasDEPRECATEDResp& ShuckleRespContainer::setShardReplicasDEPRECATED() {
_kind = ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED;
auto& x = _data.emplace<16>();
return x;
}
const ShardBlockServicesResp& ShuckleRespContainer::getShardBlockServices() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::SHARD_BLOCK_SERVICES, "%s != %s", _kind, ShuckleMessageKind::SHARD_BLOCK_SERVICES);
return std::get<17>(_data);
return std::get<16>(_data);
}
ShardBlockServicesResp& ShuckleRespContainer::setShardBlockServices() {
_kind = ShuckleMessageKind::SHARD_BLOCK_SERVICES;
auto& x = _data.emplace<17>();
auto& x = _data.emplace<16>();
return x;
}
const CdcReplicasDEPRECATEDResp& ShuckleRespContainer::getCdcReplicasDEPRECATED() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED, "%s != %s", _kind, ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED);
return std::get<18>(_data);
return std::get<17>(_data);
}
CdcReplicasDEPRECATEDResp& ShuckleRespContainer::setCdcReplicasDEPRECATED() {
_kind = ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED;
auto& x = _data.emplace<18>();
auto& x = _data.emplace<17>();
return x;
}
const AllShardsResp& ShuckleRespContainer::getAllShards() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::ALL_SHARDS, "%s != %s", _kind, ShuckleMessageKind::ALL_SHARDS);
return std::get<19>(_data);
return std::get<18>(_data);
}
AllShardsResp& ShuckleRespContainer::setAllShards() {
_kind = ShuckleMessageKind::ALL_SHARDS;
auto& x = _data.emplace<19>();
auto& x = _data.emplace<18>();
return x;
}
const DecommissionBlockServiceResp& ShuckleRespContainer::getDecommissionBlockService() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE, "%s != %s", _kind, ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE);
return std::get<20>(_data);
return std::get<19>(_data);
}
DecommissionBlockServiceResp& ShuckleRespContainer::setDecommissionBlockService() {
_kind = ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE;
auto& x = _data.emplace<20>();
auto& x = _data.emplace<19>();
return x;
}
const MoveShardLeaderResp& ShuckleRespContainer::getMoveShardLeader() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::MOVE_SHARD_LEADER, "%s != %s", _kind, ShuckleMessageKind::MOVE_SHARD_LEADER);
return std::get<21>(_data);
return std::get<20>(_data);
}
MoveShardLeaderResp& ShuckleRespContainer::setMoveShardLeader() {
_kind = ShuckleMessageKind::MOVE_SHARD_LEADER;
auto& x = _data.emplace<21>();
auto& x = _data.emplace<20>();
return x;
}
const ClearShardInfoResp& ShuckleRespContainer::getClearShardInfo() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::CLEAR_SHARD_INFO, "%s != %s", _kind, ShuckleMessageKind::CLEAR_SHARD_INFO);
return std::get<22>(_data);
return std::get<21>(_data);
}
ClearShardInfoResp& ShuckleRespContainer::setClearShardInfo() {
_kind = ShuckleMessageKind::CLEAR_SHARD_INFO;
auto& x = _data.emplace<22>();
auto& x = _data.emplace<21>();
return x;
}
const RegisterBlockServicesDEPRECATEDResp& ShuckleRespContainer::getRegisterBlockServicesDEPRECATED() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED, "%s != %s", _kind, ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED);
return std::get<23>(_data);
return std::get<22>(_data);
}
RegisterBlockServicesDEPRECATEDResp& ShuckleRespContainer::setRegisterBlockServicesDEPRECATED() {
_kind = ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED;
auto& x = _data.emplace<23>();
auto& x = _data.emplace<22>();
return x;
}
const AllCdcResp& ShuckleRespContainer::getAllCdc() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::ALL_CDC, "%s != %s", _kind, ShuckleMessageKind::ALL_CDC);
return std::get<24>(_data);
return std::get<23>(_data);
}
AllCdcResp& ShuckleRespContainer::setAllCdc() {
_kind = ShuckleMessageKind::ALL_CDC;
auto& x = _data.emplace<24>();
auto& x = _data.emplace<23>();
return x;
}
const EraseDecommissionedBlockResp& ShuckleRespContainer::getEraseDecommissionedBlock() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK, "%s != %s", _kind, ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK);
return std::get<25>(_data);
return std::get<24>(_data);
}
EraseDecommissionedBlockResp& ShuckleRespContainer::setEraseDecommissionedBlock() {
_kind = ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK;
auto& x = _data.emplace<25>();
auto& x = _data.emplace<24>();
return x;
}
const AllBlockServicesResp& ShuckleRespContainer::getAllBlockServices() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::ALL_BLOCK_SERVICES, "%s != %s", _kind, ShuckleMessageKind::ALL_BLOCK_SERVICES);
return std::get<26>(_data);
return std::get<25>(_data);
}
AllBlockServicesResp& ShuckleRespContainer::setAllBlockServices() {
_kind = ShuckleMessageKind::ALL_BLOCK_SERVICES;
auto& x = _data.emplace<26>();
auto& x = _data.emplace<25>();
return x;
}
const MoveCdcLeaderResp& ShuckleRespContainer::getMoveCdcLeader() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::MOVE_CDC_LEADER, "%s != %s", _kind, ShuckleMessageKind::MOVE_CDC_LEADER);
return std::get<27>(_data);
return std::get<26>(_data);
}
MoveCdcLeaderResp& ShuckleRespContainer::setMoveCdcLeader() {
_kind = ShuckleMessageKind::MOVE_CDC_LEADER;
auto& x = _data.emplace<27>();
auto& x = _data.emplace<26>();
return x;
}
const ClearCdcInfoResp& ShuckleRespContainer::getClearCdcInfo() const {
ALWAYS_ASSERT(_kind == ShuckleMessageKind::CLEAR_CDC_INFO, "%s != %s", _kind, ShuckleMessageKind::CLEAR_CDC_INFO);
return std::get<28>(_data);
return std::get<27>(_data);
}
ClearCdcInfoResp& ShuckleRespContainer::setClearCdcInfo() {
_kind = ShuckleMessageKind::CLEAR_CDC_INFO;
auto& x = _data.emplace<28>();
auto& x = _data.emplace<27>();
return x;
}
ShuckleRespContainer::ShuckleRespContainer() {
@@ -9001,9 +8928,6 @@ void ShuckleRespContainer::operator=(const ShuckleRespContainer& other) {
case ShuckleMessageKind::CDC_AT_LOCATION:
setCdcAtLocation() = other.getCdcAtLocation();
break;
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
setShardReplicasDEPRECATED() = other.getShardReplicasDEPRECATED();
break;
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
setShardBlockServices() = other.getShardBlockServices();
break;
@@ -9085,32 +9009,30 @@ size_t ShuckleRespContainer::packedSize() const {
return sizeof(ShuckleMessageKind) + std::get<14>(_data).packedSize();
case ShuckleMessageKind::CDC_AT_LOCATION:
return sizeof(ShuckleMessageKind) + std::get<15>(_data).packedSize();
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
return sizeof(ShuckleMessageKind) + std::get<16>(_data).packedSize();
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
return sizeof(ShuckleMessageKind) + std::get<17>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<16>(_data).packedSize();
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
return sizeof(ShuckleMessageKind) + std::get<18>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<17>(_data).packedSize();
case ShuckleMessageKind::ALL_SHARDS:
return sizeof(ShuckleMessageKind) + std::get<19>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<18>(_data).packedSize();
case ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE:
return sizeof(ShuckleMessageKind) + std::get<20>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<19>(_data).packedSize();
case ShuckleMessageKind::MOVE_SHARD_LEADER:
return sizeof(ShuckleMessageKind) + std::get<21>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<20>(_data).packedSize();
case ShuckleMessageKind::CLEAR_SHARD_INFO:
return sizeof(ShuckleMessageKind) + std::get<22>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<21>(_data).packedSize();
case ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED:
return sizeof(ShuckleMessageKind) + std::get<23>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<22>(_data).packedSize();
case ShuckleMessageKind::ALL_CDC:
return sizeof(ShuckleMessageKind) + std::get<24>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<23>(_data).packedSize();
case ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK:
return sizeof(ShuckleMessageKind) + std::get<25>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<24>(_data).packedSize();
case ShuckleMessageKind::ALL_BLOCK_SERVICES:
return sizeof(ShuckleMessageKind) + std::get<26>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<25>(_data).packedSize();
case ShuckleMessageKind::MOVE_CDC_LEADER:
return sizeof(ShuckleMessageKind) + std::get<27>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<26>(_data).packedSize();
case ShuckleMessageKind::CLEAR_CDC_INFO:
return sizeof(ShuckleMessageKind) + std::get<28>(_data).packedSize();
return sizeof(ShuckleMessageKind) + std::get<27>(_data).packedSize();
default:
throw EGGS_EXCEPTION("bad ShuckleMessageKind kind %s", _kind);
}
@@ -9167,44 +9089,41 @@ void ShuckleRespContainer::pack(BincodeBuf& buf) const {
case ShuckleMessageKind::CDC_AT_LOCATION:
std::get<15>(_data).pack(buf);
break;
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
std::get<16>(_data).pack(buf);
break;
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
std::get<17>(_data).pack(buf);
break;
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
case ShuckleMessageKind::ALL_SHARDS:
std::get<18>(_data).pack(buf);
break;
case ShuckleMessageKind::ALL_SHARDS:
case ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE:
std::get<19>(_data).pack(buf);
break;
case ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE:
case ShuckleMessageKind::MOVE_SHARD_LEADER:
std::get<20>(_data).pack(buf);
break;
case ShuckleMessageKind::MOVE_SHARD_LEADER:
case ShuckleMessageKind::CLEAR_SHARD_INFO:
std::get<21>(_data).pack(buf);
break;
case ShuckleMessageKind::CLEAR_SHARD_INFO:
case ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED:
std::get<22>(_data).pack(buf);
break;
case ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED:
case ShuckleMessageKind::ALL_CDC:
std::get<23>(_data).pack(buf);
break;
case ShuckleMessageKind::ALL_CDC:
case ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK:
std::get<24>(_data).pack(buf);
break;
case ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK:
case ShuckleMessageKind::ALL_BLOCK_SERVICES:
std::get<25>(_data).pack(buf);
break;
case ShuckleMessageKind::ALL_BLOCK_SERVICES:
case ShuckleMessageKind::MOVE_CDC_LEADER:
std::get<26>(_data).pack(buf);
break;
case ShuckleMessageKind::MOVE_CDC_LEADER:
std::get<27>(_data).pack(buf);
break;
case ShuckleMessageKind::CLEAR_CDC_INFO:
std::get<28>(_data).pack(buf);
std::get<27>(_data).pack(buf);
break;
default:
throw EGGS_EXCEPTION("bad ShuckleMessageKind kind %s", _kind);
@@ -9262,44 +9181,41 @@ void ShuckleRespContainer::unpack(BincodeBuf& buf) {
case ShuckleMessageKind::CDC_AT_LOCATION:
_data.emplace<15>().unpack(buf);
break;
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
_data.emplace<16>().unpack(buf);
break;
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
_data.emplace<17>().unpack(buf);
break;
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
case ShuckleMessageKind::ALL_SHARDS:
_data.emplace<18>().unpack(buf);
break;
case ShuckleMessageKind::ALL_SHARDS:
case ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE:
_data.emplace<19>().unpack(buf);
break;
case ShuckleMessageKind::DECOMMISSION_BLOCK_SERVICE:
case ShuckleMessageKind::MOVE_SHARD_LEADER:
_data.emplace<20>().unpack(buf);
break;
case ShuckleMessageKind::MOVE_SHARD_LEADER:
case ShuckleMessageKind::CLEAR_SHARD_INFO:
_data.emplace<21>().unpack(buf);
break;
case ShuckleMessageKind::CLEAR_SHARD_INFO:
case ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED:
_data.emplace<22>().unpack(buf);
break;
case ShuckleMessageKind::REGISTER_BLOCK_SERVICES_DE_PR_EC_AT_ED:
case ShuckleMessageKind::ALL_CDC:
_data.emplace<23>().unpack(buf);
break;
case ShuckleMessageKind::ALL_CDC:
case ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK:
_data.emplace<24>().unpack(buf);
break;
case ShuckleMessageKind::ERASE_DECOMMISSIONED_BLOCK:
case ShuckleMessageKind::ALL_BLOCK_SERVICES:
_data.emplace<25>().unpack(buf);
break;
case ShuckleMessageKind::ALL_BLOCK_SERVICES:
case ShuckleMessageKind::MOVE_CDC_LEADER:
_data.emplace<26>().unpack(buf);
break;
case ShuckleMessageKind::MOVE_CDC_LEADER:
_data.emplace<27>().unpack(buf);
break;
case ShuckleMessageKind::CLEAR_CDC_INFO:
_data.emplace<28>().unpack(buf);
_data.emplace<27>().unpack(buf);
break;
default:
throw BINCODE_EXCEPTION("bad ShuckleMessageKind kind %s", _kind);
@@ -9342,8 +9258,6 @@ bool ShuckleRespContainer::operator==(const ShuckleRespContainer& other) const {
return getShardsAtLocation() == other.getShardsAtLocation();
case ShuckleMessageKind::CDC_AT_LOCATION:
return getCdcAtLocation() == other.getCdcAtLocation();
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
return getShardReplicasDEPRECATED() == other.getShardReplicasDEPRECATED();
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
return getShardBlockServices() == other.getShardBlockServices();
case ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED:
@@ -9423,9 +9337,6 @@ std::ostream& operator<<(std::ostream& out, const ShuckleRespContainer& x) {
case ShuckleMessageKind::CDC_AT_LOCATION:
out << x.getCdcAtLocation();
break;
case ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED:
out << x.getShardReplicasDEPRECATED();
break;
case ShuckleMessageKind::SHARD_BLOCK_SERVICES:
out << x.getShardBlockServices();
break;
+4 -48
View File
@@ -317,7 +317,6 @@ enum class ShuckleMessageKind : uint8_t {
CHANGED_BLOCK_SERVICES_AT_LOCATION = 11,
SHARDS_AT_LOCATION = 12,
CDC_AT_LOCATION = 13,
SHARD_REPLICAS_DE_PR_EC_AT_ED = 16,
SHARD_BLOCK_SERVICES = 17,
CDC_REPLICAS_DE_PR_EC_AT_ED = 19,
ALL_SHARDS = 20,
@@ -349,7 +348,6 @@ const std::vector<ShuckleMessageKind> allShuckleMessageKind {
ShuckleMessageKind::CHANGED_BLOCK_SERVICES_AT_LOCATION,
ShuckleMessageKind::SHARDS_AT_LOCATION,
ShuckleMessageKind::CDC_AT_LOCATION,
ShuckleMessageKind::SHARD_REPLICAS_DE_PR_EC_AT_ED,
ShuckleMessageKind::SHARD_BLOCK_SERVICES,
ShuckleMessageKind::CDC_REPLICAS_DE_PR_EC_AT_ED,
ShuckleMessageKind::ALL_SHARDS,
@@ -3845,44 +3843,6 @@ struct CdcAtLocationResp {
std::ostream& operator<<(std::ostream& out, const CdcAtLocationResp& x);
struct ShardReplicasDEPRECATEDReq {
ShardId id;
static constexpr uint16_t STATIC_SIZE = 1; // id
ShardReplicasDEPRECATEDReq() { clear(); }
size_t packedSize() const {
size_t _size = 0;
_size += 1; // id
return _size;
}
void pack(BincodeBuf& buf) const;
void unpack(BincodeBuf& buf);
void clear();
bool operator==(const ShardReplicasDEPRECATEDReq&rhs) const;
};
std::ostream& operator<<(std::ostream& out, const ShardReplicasDEPRECATEDReq& x);
struct ShardReplicasDEPRECATEDResp {
BincodeList<AddrsInfo> replicas;
static constexpr uint16_t STATIC_SIZE = BincodeList<AddrsInfo>::STATIC_SIZE; // replicas
ShardReplicasDEPRECATEDResp() { clear(); }
size_t packedSize() const {
size_t _size = 0;
_size += replicas.packedSize(); // replicas
return _size;
}
void pack(BincodeBuf& buf) const;
void unpack(BincodeBuf& buf);
void clear();
bool operator==(const ShardReplicasDEPRECATEDResp&rhs) const;
};
std::ostream& operator<<(std::ostream& out, const ShardReplicasDEPRECATEDResp& x);
struct ShardBlockServicesReq {
ShardId shardId;
@@ -5201,9 +5161,9 @@ std::ostream& operator<<(std::ostream& out, const CDCRespContainer& x);
struct ShuckleReqContainer {
private:
static constexpr std::array<size_t,28> _staticSizes = {LocalShardsReq::STATIC_SIZE, LocalCdcReq::STATIC_SIZE, InfoReq::STATIC_SIZE, ShuckleReq::STATIC_SIZE, LocalChangedBlockServicesReq::STATIC_SIZE, CreateLocationReq::STATIC_SIZE, RenameLocationReq::STATIC_SIZE, LocationsReq::STATIC_SIZE, RegisterShardReq::STATIC_SIZE, RegisterCdcReq::STATIC_SIZE, SetBlockServiceFlagsReq::STATIC_SIZE, RegisterBlockServicesReq::STATIC_SIZE, ChangedBlockServicesAtLocationReq::STATIC_SIZE, ShardsAtLocationReq::STATIC_SIZE, CdcAtLocationReq::STATIC_SIZE, ShardReplicasDEPRECATEDReq::STATIC_SIZE, ShardBlockServicesReq::STATIC_SIZE, CdcReplicasDEPRECATEDReq::STATIC_SIZE, AllShardsReq::STATIC_SIZE, DecommissionBlockServiceReq::STATIC_SIZE, MoveShardLeaderReq::STATIC_SIZE, ClearShardInfoReq::STATIC_SIZE, RegisterBlockServicesDEPRECATEDReq::STATIC_SIZE, AllCdcReq::STATIC_SIZE, EraseDecommissionedBlockReq::STATIC_SIZE, AllBlockServicesReq::STATIC_SIZE, MoveCdcLeaderReq::STATIC_SIZE, ClearCdcInfoReq::STATIC_SIZE};
static constexpr std::array<size_t,27> _staticSizes = {LocalShardsReq::STATIC_SIZE, LocalCdcReq::STATIC_SIZE, InfoReq::STATIC_SIZE, ShuckleReq::STATIC_SIZE, LocalChangedBlockServicesReq::STATIC_SIZE, CreateLocationReq::STATIC_SIZE, RenameLocationReq::STATIC_SIZE, LocationsReq::STATIC_SIZE, RegisterShardReq::STATIC_SIZE, RegisterCdcReq::STATIC_SIZE, SetBlockServiceFlagsReq::STATIC_SIZE, RegisterBlockServicesReq::STATIC_SIZE, ChangedBlockServicesAtLocationReq::STATIC_SIZE, ShardsAtLocationReq::STATIC_SIZE, CdcAtLocationReq::STATIC_SIZE, ShardBlockServicesReq::STATIC_SIZE, CdcReplicasDEPRECATEDReq::STATIC_SIZE, AllShardsReq::STATIC_SIZE, DecommissionBlockServiceReq::STATIC_SIZE, MoveShardLeaderReq::STATIC_SIZE, ClearShardInfoReq::STATIC_SIZE, RegisterBlockServicesDEPRECATEDReq::STATIC_SIZE, AllCdcReq::STATIC_SIZE, EraseDecommissionedBlockReq::STATIC_SIZE, AllBlockServicesReq::STATIC_SIZE, MoveCdcLeaderReq::STATIC_SIZE, ClearCdcInfoReq::STATIC_SIZE};
ShuckleMessageKind _kind = ShuckleMessageKind::EMPTY;
std::variant<LocalShardsReq, LocalCdcReq, InfoReq, ShuckleReq, LocalChangedBlockServicesReq, CreateLocationReq, RenameLocationReq, LocationsReq, RegisterShardReq, RegisterCdcReq, SetBlockServiceFlagsReq, RegisterBlockServicesReq, ChangedBlockServicesAtLocationReq, ShardsAtLocationReq, CdcAtLocationReq, ShardReplicasDEPRECATEDReq, ShardBlockServicesReq, CdcReplicasDEPRECATEDReq, AllShardsReq, DecommissionBlockServiceReq, MoveShardLeaderReq, ClearShardInfoReq, RegisterBlockServicesDEPRECATEDReq, AllCdcReq, EraseDecommissionedBlockReq, AllBlockServicesReq, MoveCdcLeaderReq, ClearCdcInfoReq> _data;
std::variant<LocalShardsReq, LocalCdcReq, InfoReq, ShuckleReq, LocalChangedBlockServicesReq, CreateLocationReq, RenameLocationReq, LocationsReq, RegisterShardReq, RegisterCdcReq, SetBlockServiceFlagsReq, RegisterBlockServicesReq, ChangedBlockServicesAtLocationReq, ShardsAtLocationReq, CdcAtLocationReq, ShardBlockServicesReq, CdcReplicasDEPRECATEDReq, AllShardsReq, DecommissionBlockServiceReq, MoveShardLeaderReq, ClearShardInfoReq, RegisterBlockServicesDEPRECATEDReq, AllCdcReq, EraseDecommissionedBlockReq, AllBlockServicesReq, MoveCdcLeaderReq, ClearCdcInfoReq> _data;
public:
ShuckleReqContainer();
ShuckleReqContainer(const ShuckleReqContainer& other);
@@ -5243,8 +5203,6 @@ public:
ShardsAtLocationReq& setShardsAtLocation();
const CdcAtLocationReq& getCdcAtLocation() const;
CdcAtLocationReq& setCdcAtLocation();
const ShardReplicasDEPRECATEDReq& getShardReplicasDEPRECATED() const;
ShardReplicasDEPRECATEDReq& setShardReplicasDEPRECATED();
const ShardBlockServicesReq& getShardBlockServices() const;
ShardBlockServicesReq& setShardBlockServices();
const CdcReplicasDEPRECATEDReq& getCdcReplicasDEPRECATED() const;
@@ -5283,9 +5241,9 @@ std::ostream& operator<<(std::ostream& out, const ShuckleReqContainer& x);
struct ShuckleRespContainer {
private:
static constexpr std::array<size_t,29> _staticSizes = {sizeof(EggsError), LocalShardsResp::STATIC_SIZE, LocalCdcResp::STATIC_SIZE, InfoResp::STATIC_SIZE, ShuckleResp::STATIC_SIZE, LocalChangedBlockServicesResp::STATIC_SIZE, CreateLocationResp::STATIC_SIZE, RenameLocationResp::STATIC_SIZE, LocationsResp::STATIC_SIZE, RegisterShardResp::STATIC_SIZE, RegisterCdcResp::STATIC_SIZE, SetBlockServiceFlagsResp::STATIC_SIZE, RegisterBlockServicesResp::STATIC_SIZE, ChangedBlockServicesAtLocationResp::STATIC_SIZE, ShardsAtLocationResp::STATIC_SIZE, CdcAtLocationResp::STATIC_SIZE, ShardReplicasDEPRECATEDResp::STATIC_SIZE, ShardBlockServicesResp::STATIC_SIZE, CdcReplicasDEPRECATEDResp::STATIC_SIZE, AllShardsResp::STATIC_SIZE, DecommissionBlockServiceResp::STATIC_SIZE, MoveShardLeaderResp::STATIC_SIZE, ClearShardInfoResp::STATIC_SIZE, RegisterBlockServicesDEPRECATEDResp::STATIC_SIZE, AllCdcResp::STATIC_SIZE, EraseDecommissionedBlockResp::STATIC_SIZE, AllBlockServicesResp::STATIC_SIZE, MoveCdcLeaderResp::STATIC_SIZE, ClearCdcInfoResp::STATIC_SIZE};
static constexpr std::array<size_t,28> _staticSizes = {sizeof(EggsError), LocalShardsResp::STATIC_SIZE, LocalCdcResp::STATIC_SIZE, InfoResp::STATIC_SIZE, ShuckleResp::STATIC_SIZE, LocalChangedBlockServicesResp::STATIC_SIZE, CreateLocationResp::STATIC_SIZE, RenameLocationResp::STATIC_SIZE, LocationsResp::STATIC_SIZE, RegisterShardResp::STATIC_SIZE, RegisterCdcResp::STATIC_SIZE, SetBlockServiceFlagsResp::STATIC_SIZE, RegisterBlockServicesResp::STATIC_SIZE, ChangedBlockServicesAtLocationResp::STATIC_SIZE, ShardsAtLocationResp::STATIC_SIZE, CdcAtLocationResp::STATIC_SIZE, ShardBlockServicesResp::STATIC_SIZE, CdcReplicasDEPRECATEDResp::STATIC_SIZE, AllShardsResp::STATIC_SIZE, DecommissionBlockServiceResp::STATIC_SIZE, MoveShardLeaderResp::STATIC_SIZE, ClearShardInfoResp::STATIC_SIZE, RegisterBlockServicesDEPRECATEDResp::STATIC_SIZE, AllCdcResp::STATIC_SIZE, EraseDecommissionedBlockResp::STATIC_SIZE, AllBlockServicesResp::STATIC_SIZE, MoveCdcLeaderResp::STATIC_SIZE, ClearCdcInfoResp::STATIC_SIZE};
ShuckleMessageKind _kind = ShuckleMessageKind::EMPTY;
std::variant<EggsError, LocalShardsResp, LocalCdcResp, InfoResp, ShuckleResp, LocalChangedBlockServicesResp, CreateLocationResp, RenameLocationResp, LocationsResp, RegisterShardResp, RegisterCdcResp, SetBlockServiceFlagsResp, RegisterBlockServicesResp, ChangedBlockServicesAtLocationResp, ShardsAtLocationResp, CdcAtLocationResp, ShardReplicasDEPRECATEDResp, ShardBlockServicesResp, CdcReplicasDEPRECATEDResp, AllShardsResp, DecommissionBlockServiceResp, MoveShardLeaderResp, ClearShardInfoResp, RegisterBlockServicesDEPRECATEDResp, AllCdcResp, EraseDecommissionedBlockResp, AllBlockServicesResp, MoveCdcLeaderResp, ClearCdcInfoResp> _data;
std::variant<EggsError, LocalShardsResp, LocalCdcResp, InfoResp, ShuckleResp, LocalChangedBlockServicesResp, CreateLocationResp, RenameLocationResp, LocationsResp, RegisterShardResp, RegisterCdcResp, SetBlockServiceFlagsResp, RegisterBlockServicesResp, ChangedBlockServicesAtLocationResp, ShardsAtLocationResp, CdcAtLocationResp, ShardBlockServicesResp, CdcReplicasDEPRECATEDResp, AllShardsResp, DecommissionBlockServiceResp, MoveShardLeaderResp, ClearShardInfoResp, RegisterBlockServicesDEPRECATEDResp, AllCdcResp, EraseDecommissionedBlockResp, AllBlockServicesResp, MoveCdcLeaderResp, ClearCdcInfoResp> _data;
public:
ShuckleRespContainer();
ShuckleRespContainer(const ShuckleRespContainer& other);
@@ -5327,8 +5285,6 @@ public:
ShardsAtLocationResp& setShardsAtLocation();
const CdcAtLocationResp& getCdcAtLocation() const;
CdcAtLocationResp& setCdcAtLocation();
const ShardReplicasDEPRECATEDResp& getShardReplicasDEPRECATED() const;
ShardReplicasDEPRECATEDResp& setShardReplicasDEPRECATED();
const ShardBlockServicesResp& getShardBlockServices() const;
ShardBlockServicesResp& setShardBlockServices();
const CdcReplicasDEPRECATEDResp& getCdcReplicasDEPRECATED() const;
+14 -5
View File
@@ -16,11 +16,19 @@ constexpr uint32_t SHARD_LOG_PROTOCOL_VERSION = 0x2414853;
// >>> format(struct.unpack('<I', b'SHA\3')[0], 'x')
// '3414853'
constexpr uint32_t SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION = 0x3414853;
constexpr uint32_t CDC_TO_SHARD_REQ_PROTOCOL_VERSION = 0x3414853;
// >>> format(struct.unpack('<I', b'SHA\4')[0], 'x')
// '4414853'
constexpr uint32_t SHARD_CHECK_POINTED_RESP_PROTOCOL_VERSION = 0x4414853;
constexpr uint32_t CDC_TO_SHARD_RESP_PROTOCOL_VERSION = 0x4414853;
// >>> format(struct.unpack('<I', b'SHA\5')[0], 'x')
// '5414853'
constexpr uint32_t PROXY_SHARD_REQ_PROTOCOL_VERSION = 0x5414853;
// >>> format(struct.unpack('<I', b'SHA\6')[0], 'x')
// '6414853'
constexpr uint32_t PROXY_SHARD_RESP_PROTOCOL_VERSION = 0x6414853;
// >>> format(struct.unpack('<I', b'CDC\0')[0], 'x')
// '434443'
@@ -82,11 +90,12 @@ inline std::ostream& operator<<(std::ostream& out, const ShardCheckPointedResp&
}
using ShardReqMsg = ProtocolMessage<SHARD_REQ_PROTOCOL_VERSION, ShardReqContainer>;
using SignedShardReqMsg = SignedProtocolMessage<SHARD_REQ_PROTOCOL_VERSION, ShardReqContainer>;
using ShardRespMsg = ProtocolMessage<SHARD_RESP_PROTOCOL_VERSION, ShardRespContainer>;
using CdcToShardReqMsg = SignedProtocolMessage<CDC_TO_SHARD_REQ_PROTOCOL_VERSION, ShardReqContainer>;
using CdcToShardRespMsg = SignedProtocolMessage<CDC_TO_SHARD_RESP_PROTOCOL_VERSION, ShardCheckPointedResp>;
using CDCReqMsg = ProtocolMessage<CDC_REQ_PROTOCOL_VERSION, CDCReqContainer>;
using CDCRespMsg = ProtocolMessage<CDC_RESP_PROTOCOL_VERSION, CDCRespContainer>;
using LogReqMsg = SignedProtocolMessage<LOG_REQ_PROTOCOL_VERSION, LogReqContainer>;
using LogRespMsg = SignedProtocolMessage<LOG_RESP_PROTOCOL_VERSION, LogRespContainer>;
using ShardCheckPointedReqMsg = SignedProtocolMessage<SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION, ShardReqContainer>;
using ShardCheckPointedRespMsg = SignedProtocolMessage<SHARD_CHECK_POINTED_RESP_PROTOCOL_VERSION, ShardCheckPointedResp>;
using ProxyShardReqMsg = SignedProtocolMessage<PROXY_SHARD_REQ_PROTOCOL_VERSION, ShardReqContainer>;
using ProxyShardRespMsg = SignedProtocolMessage<PROXY_SHARD_RESP_PROTOCOL_VERSION, ShardCheckPointedResp>;
+6 -9
View File
@@ -242,7 +242,7 @@ std::pair<int, std::string> registerShard(
}
std::pair<int, std::string> fetchShardReplicas(
const std::string& addr, uint16_t port, Duration timeout, ShardReplicaId shrid, std::array<AddrsInfo, 5>& replicas
const std::string& addr, uint16_t port, Duration timeout, ShardId shid, std::vector<FullShardInfo>& replicas
) {
const auto [sock, errStr] = shuckleSock(addr, port, timeout);
if (sock.error()) {
@@ -250,9 +250,7 @@ std::pair<int, std::string> fetchShardReplicas(
}
ShuckleReqContainer reqContainer;
auto& req = reqContainer.setShardReplicasDEPRECATED();
req.id = shrid.shardId();
auto& req = reqContainer.setAllShards();
{
const auto [err, errStr] = writeShuckleRequest(sock.get(), reqContainer, timeout);
if (err) { return {err, errStr}; }
@@ -264,11 +262,10 @@ std::pair<int, std::string> fetchShardReplicas(
if (err) { return {err, errStr}; }
}
if (respContainer.getShardReplicasDEPRECATED().replicas.els.size() != replicas.size()) {
throw EGGS_EXCEPTION("expecting %s replicas, got %s", replicas.size(), respContainer.getShardReplicasDEPRECATED().replicas.els.size());
}
for (int i = 0; i < replicas.size(); i++) {
replicas[i] = respContainer.getShardReplicasDEPRECATED().replicas.els[i];
for(auto& shard : respContainer.getAllShards().shards.els) {
if (shard.id.shardId() == shid) {
replicas.emplace_back(std::move(shard));
}
}
return {};
+2 -2
View File
@@ -37,8 +37,8 @@ std::pair<int, std::string> fetchShardReplicas(
const std::string& shuckleHost,
uint16_t shucklePort,
Duration timeout,
ShardReplicaId shrid,
std::array<AddrsInfo, 5>& replicas
ShardId shid,
std::vector<FullShardInfo>& replicas
);
std::pair<int, std::string> registerCDCReplica(
+978 -339
View File
File diff suppressed because it is too large Load Diff
+4
View File
@@ -31,6 +31,10 @@ struct ShardOptions {
// LogsDB settings
bool avoidBeingLeader = true;
bool noReplication = false;
// implicit options
bool isLeader() const { return !avoidBeingLeader; }
bool isProxyLocation() const { return location != 0; }
};
void runShard(ShardOptions& options);
-5
View File
@@ -1858,11 +1858,6 @@ func main() {
reflect.TypeOf(msgs.CdcAtLocationReq{}),
reflect.TypeOf(msgs.CdcAtLocationResp{}),
},
{
0x10,
reflect.TypeOf(msgs.ShardReplicasDEPRECATEDReq{}),
reflect.TypeOf(msgs.ShardReplicasDEPRECATEDResp{}),
},
{
0x11,
reflect.TypeOf(msgs.ShardBlockServicesReq{}),
-2
View File
@@ -69,8 +69,6 @@ func readShuckleResponse(
resp = &msgs.LocalShardsResp{}
case msgs.REGISTER_SHARD:
resp = &msgs.RegisterShardResp{}
case msgs.SHARD_REPLICAS_DE_PR_EC_AT_ED:
resp = &msgs.ShardReplicasDEPRECATEDResp{}
case msgs.ALL_BLOCK_SERVICES:
resp = &msgs.AllBlockServicesResp{}
case msgs.SET_BLOCK_SERVICE_FLAGS:
+14
View File
@@ -39,6 +39,20 @@ func WaitForBlockServices(ll *lib.Logger, shuckleAddress string, expectedBlockSe
}
}
func WaitForShuckle(ll *lib.Logger, shuckleAddress string, timeout time.Duration) error {
t0 := time.Now()
for {
_, err := ShuckleRequest(ll, nil, shuckleAddress, &msgs.ShuckleReq{})
if err == nil {
return nil
}
if time.Since(t0) > timeout {
return err
}
time.Sleep(10 * time.Millisecond)
}
}
// getting a client implies having all shards and cdc.
func WaitForClient(log *lib.Logger, shuckleAddress string, timeout time.Duration) {
t0 := time.Now()
+56 -28
View File
@@ -40,6 +40,7 @@ func main() {
shuckleScriptsJs := flag.String("shuckle-scripts-js", "", "")
noFuse := flag.Bool("no-fuse", false, "")
leaderOnly := flag.Bool("leader-only", false, "Run only LogsDB leader with LEADER_NO_FOLLOWERS")
multiLocation := flag.Bool("multi-location", false, "Run 2 sets of shards/shuckle/cdc/storages to simulate multi data centre setup")
useRandomFetchApi := flag.Bool("use-random-fetch-api", false, "if set randomly uses api with or without crc when fetching from block service")
flag.Parse()
noRunawayArgs()
@@ -136,6 +137,22 @@ func main() {
Addr1: "127.0.0.1:10001",
})
if *multiLocation {
// Waiting for shuckle
err := client.WaitForShuckle(log, shuckleAddress, 10*time.Second)
if err != nil {
panic(fmt.Errorf("failed to connect to shuckle %v", err))
}
_, err = client.ShuckleRequest(log, nil, shuckleAddress, &msgs.CreateLocationReq{1, "location1"})
if err != nil {
// it's possible location already exits, try renaming it
_, err = client.ShuckleRequest(log, nil, shuckleAddress, &msgs.RenameLocationReq{1, "location1"})
if err != nil {
panic(fmt.Errorf("failed to create location %v", err))
}
}
}
// Start block services
storageClasses := make([]msgs.StorageClass, *hddBlockServices+*flashBlockServices)
for i := range storageClasses {
@@ -210,36 +227,47 @@ func main() {
}
// Start shards
for i := 0; i < 256; i++ {
for r := uint8(0); r < 5; r++ {
shrid := msgs.MakeShardReplicaId(msgs.ShardId(i), msgs.ReplicaId(r))
opts := managedprocess.ShardOpts{
Exe: cppExes.ShardExe,
Shrid: shrid,
Dir: path.Join(*dataDir, fmt.Sprintf("shard_%03d_%d", i, r)),
LogLevel: level,
Valgrind: *buildType == "valgrind",
ShuckleAddress: shuckleAddress,
Perf: *profile,
Xmon: *xmon,
LogsDBFlags: nil,
}
if *leaderOnly && r > 0 {
continue
}
if r == 0 {
if *leaderOnly {
opts.LogsDBFlags = []string{"-logsdb-leader", "-logsdb-no-replication"}
} else {
opts.LogsDBFlags = []string{"-logsdb-leader"}
numLocations := 1
if *multiLocation {
numLocations = 2
}
for loc := 0; loc < numLocations; loc++ {
for i := 0; i < 256; i++ {
for r := uint8(0); r < 5; r++ {
shrid := msgs.MakeShardReplicaId(msgs.ShardId(i), msgs.ReplicaId(r))
dirName := fmt.Sprintf("shard_%03d_%d", i, r)
if loc > 0 {
dirName = fmt.Sprintf("%s_loc%d", dirName, loc)
}
opts := managedprocess.ShardOpts{
Exe: cppExes.ShardExe,
Shrid: shrid,
Dir: path.Join(*dataDir, dirName),
LogLevel: level,
Valgrind: *buildType == "valgrind",
ShuckleAddress: shuckleAddress,
Perf: *profile,
Xmon: *xmon,
Location: msgs.Location(loc),
LogsDBFlags: nil,
}
if *leaderOnly && r > 0 {
continue
}
if r == 0 {
if *leaderOnly {
opts.LogsDBFlags = []string{"-logsdb-leader", "-logsdb-no-replication"}
} else {
opts.LogsDBFlags = []string{"-logsdb-leader"}
}
}
if *startingPort != 0 {
opts.Addr1 = fmt.Sprintf("127.0.0.1:%v", uint16(*startingPort)+5+uint16(i)+uint16(r)*256+5*256*uint16(loc))
} else {
opts.Addr1 = "127.0.0.1:0"
}
procs.StartShard(log, *repoDir, &opts)
}
if *startingPort != 0 {
opts.Addr1 = fmt.Sprintf("127.0.0.1:%v", uint16(*startingPort)+5+uint16(i)+uint16(r)*256)
} else {
opts.Addr1 = "127.0.0.1:0"
}
procs.StartShard(log, *repoDir, &opts)
}
}
+3 -43
View File
@@ -45,6 +45,7 @@ import (
)
const zeroIPString = "x'00000000'"
const DEFAULT_LOCATION = 0
type namedTemplate struct {
name string
@@ -514,7 +515,7 @@ func handleAllBlockServices(ll *lib.Logger, s *state, req *msgs.AllBlockServices
}
func handleLocalChangedBlockServices(ll *lib.Logger, s *state, req *msgs.LocalChangedBlockServicesReq) (*msgs.LocalChangedBlockServicesResp, error) {
reqAtLocation := &msgs.ChangedBlockServicesAtLocationReq{0, req.ChangedSince}
reqAtLocation := &msgs.ChangedBlockServicesAtLocationReq{DEFAULT_LOCATION, req.ChangedSince}
respAtLocation, err := handleChangedBlockServicesAtLocation(ll, s, reqAtLocation)
if err != nil {
return nil, err
@@ -823,7 +824,7 @@ func handleSetBlockServiceFlags(ll *lib.Logger, s *state, req *msgs.SetBlockServ
}
func handleLocalShards(ll *lib.Logger, s *state, _ *msgs.LocalShardsReq) (*msgs.LocalShardsResp, error) {
reqAtLocation := &msgs.ShardsAtLocationReq{0}
reqAtLocation := &msgs.ShardsAtLocationReq{DEFAULT_LOCATION}
resp, err := handleShardsAtLocation(ll, s, reqAtLocation)
if err != nil {
return nil, err
@@ -918,43 +919,6 @@ func handleRegisterShard(ll *lib.Logger, s *state, req *msgs.RegisterShardReq) (
return
}
func handleShardReplicas(ll *lib.Logger, s *state, req *msgs.ShardReplicasDEPRECATEDReq) (*msgs.ShardReplicasDEPRECATEDResp, error) {
s.semaphore.Acquire(context.Background(), 1)
defer s.semaphore.Release(1)
var ret [5]msgs.AddrsInfo
n := sql.Named
rows, err := s.db.Query("SELECT * FROM shards WHERE last_seen IS NOT NULL AND id = :id AND location_id = 0", n("id", req.Id))
if err != nil {
return nil, fmt.Errorf("error selecting shard replicas: %s", err)
}
defer rows.Close()
i := 0
for rows.Next() {
if i > 5 {
return nil, fmt.Errorf("the number of shards returned exceeded 5")
}
si := msgs.AddrsInfo{}
var ip1, ip2 []byte
var id int
var replicaId int
var isLeader bool
var lastSeen uint64
var loc msgs.Location
err = rows.Scan(&id, &replicaId, &loc, &isLeader, &ip1, &si.Addr1.Port, &ip2, &si.Addr2.Port, &lastSeen)
if err != nil {
return nil, fmt.Errorf("error decoding shard row: %s", err)
}
copy(si.Addr1.Addrs[:], ip1)
copy(si.Addr2.Addrs[:], ip2)
ret[replicaId] = si
i += 1
}
return &msgs.ShardReplicasDEPRECATEDResp{Replicas: ret[:]}, nil
}
func handleCdcAtLocation(log *lib.Logger, s *state, req *msgs.CdcAtLocationReq) (*msgs.CdcAtLocationResp, error) {
resp := msgs.CdcAtLocationResp{}
cdc, err := s.selectCDC(req.LocationId)
@@ -1348,8 +1312,6 @@ func handleRequestParsed(log *lib.Logger, s *state, req msgs.ShuckleRequest) (ms
resp, err = handleAllShards(log, s, whichReq)
case *msgs.RegisterShardReq:
resp, err = handleRegisterShard(log, s, whichReq)
case *msgs.ShardReplicasDEPRECATEDReq:
resp, err = handleShardReplicas(log, s, whichReq)
case *msgs.AllBlockServicesReq:
resp, err = handleAllBlockServices(log, s, whichReq)
case *msgs.LocalChangedBlockServicesReq:
@@ -1514,8 +1476,6 @@ func readShuckleRequest(
req = &msgs.LocalShardsReq{}
case msgs.REGISTER_SHARD:
req = &msgs.RegisterShardReq{}
case msgs.SHARD_REPLICAS_DE_PR_EC_AT_ED:
req = &msgs.ShardReplicasDEPRECATEDReq{}
case msgs.ALL_BLOCK_SERVICES:
req = &msgs.AllBlockServicesReq{}
case msgs.LOCAL_CHANGED_BLOCK_SERVICES:
@@ -114,8 +114,6 @@ func readShuckleRequest(
req = &msgs.LocalShardsReq{}
case msgs.REGISTER_SHARD:
req = &msgs.RegisterShardReq{}
case msgs.SHARD_REPLICAS_DE_PR_EC_AT_ED:
req = &msgs.ShardReplicasDEPRECATEDReq{}
case msgs.ALL_BLOCK_SERVICES:
req = &msgs.AllBlockServicesReq{}
case msgs.LOCAL_CHANGED_BLOCK_SERVICES:
+2
View File
@@ -477,6 +477,7 @@ type ShardOpts struct {
Addr2 string
TransientDeadlineInterval *time.Duration
Xmon string
Location msgs.Location
LogsDBFlags []string
}
@@ -517,6 +518,7 @@ func (procs *ManagedProcesses) StartShard(ll *lib.Logger, repoDir string, opts *
opts.Dir,
fmt.Sprintf("%d", int(opts.Shrid.Shard())),
fmt.Sprintf("%d", int(opts.Shrid.Replica())),
fmt.Sprintf("%d", int(opts.Location)),
)
cppDir := cppDir(repoDir)
mpArgs := ManagedProcessArgs{
-9
View File
@@ -2162,15 +2162,6 @@ type RegisterBlockServicesDEPRECATEDReq struct {
}
type RegisterBlockServicesDEPRECATEDResp struct{}
type ShardReplicasDEPRECATEDReq struct {
Id ShardId
}
type ShardReplicasDEPRECATEDResp struct {
// Always 5 length. If we don't have info for some replicas, the AddrsInfo
// is zeroed.
Replicas []AddrsInfo
}
// --------------------------------------------------------------------
// block service requests/responses
-55
View File
@@ -738,8 +738,6 @@ func (k ShuckleMessageKind) String() string {
return "SHARDS_AT_LOCATION"
case 13:
return "CDC_AT_LOCATION"
case 16:
return "SHARD_REPLICAS_DE_PR_EC_AT_ED"
case 17:
return "SHARD_BLOCK_SERVICES"
case 19:
@@ -786,7 +784,6 @@ const (
CHANGED_BLOCK_SERVICES_AT_LOCATION ShuckleMessageKind = 0xB
SHARDS_AT_LOCATION ShuckleMessageKind = 0xC
CDC_AT_LOCATION ShuckleMessageKind = 0xD
SHARD_REPLICAS_DE_PR_EC_AT_ED ShuckleMessageKind = 0x10
SHARD_BLOCK_SERVICES ShuckleMessageKind = 0x11
CDC_REPLICAS_DE_PR_EC_AT_ED ShuckleMessageKind = 0x13
ALL_SHARDS ShuckleMessageKind = 0x14
@@ -817,7 +814,6 @@ var AllShuckleMessageKind = [...]ShuckleMessageKind{
CHANGED_BLOCK_SERVICES_AT_LOCATION,
SHARDS_AT_LOCATION,
CDC_AT_LOCATION,
SHARD_REPLICAS_DE_PR_EC_AT_ED,
SHARD_BLOCK_SERVICES,
CDC_REPLICAS_DE_PR_EC_AT_ED,
ALL_SHARDS,
@@ -866,8 +862,6 @@ func MkShuckleMessage(k string) (ShuckleRequest, ShuckleResponse, error) {
return &ShardsAtLocationReq{}, &ShardsAtLocationResp{}, nil
case k == "CDC_AT_LOCATION":
return &CdcAtLocationReq{}, &CdcAtLocationResp{}, nil
case k == "SHARD_REPLICAS_DE_PR_EC_AT_ED":
return &ShardReplicasDEPRECATEDReq{}, &ShardReplicasDEPRECATEDResp{}, nil
case k == "SHARD_BLOCK_SERVICES":
return &ShardBlockServicesReq{}, &ShardBlockServicesResp{}, nil
case k == "CDC_REPLICAS_DE_PR_EC_AT_ED":
@@ -5276,55 +5270,6 @@ func (v *CdcAtLocationResp) Unpack(r io.Reader) error {
return nil
}
func (v *ShardReplicasDEPRECATEDReq) ShuckleRequestKind() ShuckleMessageKind {
return SHARD_REPLICAS_DE_PR_EC_AT_ED
}
func (v *ShardReplicasDEPRECATEDReq) Pack(w io.Writer) error {
if err := bincode.PackScalar(w, uint8(v.Id)); err != nil {
return err
}
return nil
}
func (v *ShardReplicasDEPRECATEDReq) Unpack(r io.Reader) error {
if err := bincode.UnpackScalar(r, (*uint8)(&v.Id)); err != nil {
return err
}
return nil
}
func (v *ShardReplicasDEPRECATEDResp) ShuckleResponseKind() ShuckleMessageKind {
return SHARD_REPLICAS_DE_PR_EC_AT_ED
}
func (v *ShardReplicasDEPRECATEDResp) Pack(w io.Writer) error {
len1 := len(v.Replicas)
if err := bincode.PackLength(w, len1); err != nil {
return err
}
for i := 0; i < len1; i++ {
if err := v.Replicas[i].Pack(w); err != nil {
return err
}
}
return nil
}
func (v *ShardReplicasDEPRECATEDResp) Unpack(r io.Reader) error {
var len1 int
if err := bincode.UnpackLength(r, &len1); err != nil {
return err
}
bincode.EnsureLength(&v.Replicas, len1)
for i := 0; i < len1; i++ {
if err := v.Replicas[i].Unpack(r); err != nil {
return err
}
}
return nil
}
func (v *ShardBlockServicesReq) ShuckleRequestKind() ShuckleMessageKind {
return SHARD_BLOCK_SERVICES
}