all: migrate to new BlockServiceInfo

This commit is contained in:
Miroslav Crnic
2024-07-23 13:53:07 +01:00
committed by GitHub Enterprise
parent 49723653f8
commit 1b258dc422
11 changed files with 35 additions and 35 deletions
+3 -3
View File
@@ -117,7 +117,7 @@ static std::pair<int, std::string> readShuckleResponse(int fd, ShuckleRespContai
return {};
}
std::pair<int, std::string> fetchBlockServices(const std::string& addr, uint16_t port, Duration timeout, ShardId shid, std::vector<BlockServiceInfoWithoutFlagsLastChanged>& blockServices, std::vector<BlockServiceId>& currentBlockServices) {
std::pair<int, std::string> fetchBlockServices(const std::string& addr, uint16_t port, Duration timeout, ShardId shid, std::vector<BlockServiceInfo>& blockServices, std::vector<BlockServiceId>& currentBlockServices) {
blockServices.clear();
currentBlockServices.clear();
@@ -131,7 +131,7 @@ std::pair<int, std::string> fetchBlockServices(const std::string& addr, uint16_t
// all block services
{
ShuckleReqContainer reqContainer;
auto& req = reqContainer.setAllBlockServicesWithoutFlagsLastChanged();
auto& req = reqContainer.setAllBlockServices();
{
const auto [err, errStr] = writeShuckleRequest(sock.get(), reqContainer, timeout);
if (err) { FAIL(err, errStr); }
@@ -143,7 +143,7 @@ std::pair<int, std::string> fetchBlockServices(const std::string& addr, uint16_t
if (err) { FAIL(err, errStr); }
}
blockServices = respContainer.getAllBlockServicesWithoutFlagsLastChanged().blockServices.els;
blockServices = respContainer.getAllBlockServices().blockServices.els;
}
// current block services
+1 -1
View File
@@ -19,7 +19,7 @@ std::pair<int, std::string> fetchBlockServices(
uint16_t shucklePort,
Duration timeout,
ShardId shid,
std::vector<BlockServiceInfoWithoutFlagsLastChanged>& blockServices,
std::vector<BlockServiceInfo>& blockServices,
std::vector<BlockServiceId>& currentBlockServices
);
+1 -1
View File
@@ -173,7 +173,7 @@ BlockServicesCache BlockServicesCacheDB::getCache() const {
return BlockServicesCache(_mutex, _blockServices, _currentBlockServices);
}
void BlockServicesCacheDB::updateCache(const std::vector<BlockServiceInfoWithoutFlagsLastChanged>& blockServices, const std::vector<BlockServiceId>& currentBlockServices) {
void BlockServicesCacheDB::updateCache(const std::vector<BlockServiceInfo>& blockServices, const std::vector<BlockServiceId>& currentBlockServices) {
LOG_INFO(_env, "Updating block service cache");
std::unique_lock _(_mutex);
+1 -1
View File
@@ -64,7 +64,7 @@ public:
BlockServicesCacheDB(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const SharedRocksDB& sharedDB);
static std::vector<rocksdb::ColumnFamilyDescriptor> getColumnFamilyDescriptors();
void updateCache(const std::vector<BlockServiceInfoWithoutFlagsLastChanged>& blockServices, const std::vector<BlockServiceId>& currentBlockServices);
void updateCache(const std::vector<BlockServiceInfo>& blockServices, const std::vector<BlockServiceId>& currentBlockServices);
// We've seen at least one `updateCache()`, or we've loaded the
// block services from the cache.
+1 -1
View File
@@ -965,7 +965,7 @@ private:
std::string _shuckleHost;
uint16_t _shucklePort;
XmonNCAlert _alert;
std::vector<BlockServiceInfoWithoutFlagsLastChanged> _blockServices;
std::vector<BlockServiceInfo> _blockServices;
std::vector<BlockServiceId> _currentBlockServices;
bool _updatedOnce;
public:
+2 -2
View File
@@ -620,12 +620,12 @@ OUT:
}
m.cleanVisitedBlockService()
m.log.Debug("requesting block services")
blockServicesResp, err := client.ShuckleRequest(m.log, nil, m.shuckleAddress, &msgs.AllBlockServicesWithoutFlagsLastChangedReq{})
blockServicesResp, err := client.ShuckleRequest(m.log, nil, m.shuckleAddress, &msgs.AllBlockServicesReq{})
if err != nil {
m.log.RaiseNC(shuckleResponseAlert, "error getting block services from shuckle: %v", err)
} else {
m.log.ClearNC(shuckleResponseAlert)
blockServices := blockServicesResp.(*msgs.AllBlockServicesWithoutFlagsLastChangedResp)
blockServices := blockServicesResp.(*msgs.AllBlockServicesResp)
for _, bs := range blockServices.BlockServices {
if bs.Flags.HasAny(msgs.EGGSFS_BLOCK_SERVICE_DECOMMISSIONED) && bs.HasFiles {
m.ScheduleBlockService(bs.Id)
+2 -2
View File
@@ -73,8 +73,8 @@ func readShuckleResponse(
resp = &msgs.RegisterShardReplicaResp{}
case msgs.SHARD_REPLICAS:
resp = &msgs.ShardReplicasResp{}
case msgs.ALL_BLOCK_SERVICES_WITHOUT_FLAGS_LAST_CHANGED:
resp = &msgs.AllBlockServicesWithoutFlagsLastChangedResp{}
case msgs.ALL_BLOCK_SERVICES:
resp = &msgs.AllBlockServicesResp{}
case msgs.SET_BLOCK_SERVICE_FLAGS:
resp = &msgs.SetBlockServiceFlagsResp{}
case msgs.SET_BLOCK_SERVICE_DECOMMISSIONED:
+4 -4
View File
@@ -7,17 +7,17 @@ import (
"xtx/eggsfs/msgs"
)
func WaitForBlockServices(ll *lib.Logger, shuckleAddress string, expectedBlockServices int, timeout time.Duration) []msgs.BlockServiceInfoWithoutFlagsLastChanged {
func WaitForBlockServices(ll *lib.Logger, shuckleAddress string, expectedBlockServices int, timeout time.Duration) []msgs.BlockServiceInfo {
var err error
for {
var resp msgs.ShuckleResponse
var bss []msgs.BlockServiceInfoWithoutFlagsLastChanged
resp, err = ShuckleRequest(ll, nil, shuckleAddress, &msgs.AllBlockServicesWithoutFlagsLastChangedReq{})
var bss []msgs.BlockServiceInfo
resp, err = ShuckleRequest(ll, nil, shuckleAddress, &msgs.AllBlockServicesReq{})
if err != nil {
ll.Debug("got error while getting block services from shuckle, will keep waiting: %v", err)
goto KeepChecking
}
bss = resp.(*msgs.AllBlockServicesWithoutFlagsLastChangedResp).BlockServices
bss = resp.(*msgs.AllBlockServicesResp).BlockServices
if len(bss) < expectedBlockServices {
err = fmt.Errorf("not all block services are up yet, will keep waiting")
ll.Debug("%v", err)
+5 -5
View File
@@ -1173,7 +1173,7 @@ func getMountsInfo(log *lib.Logger, mountsPath string) (map[string]string, error
return ret, nil
}
func normalBlockServiceInfo(info *msgs.BlockServiceInfoWithoutFlagsLastChanged) *msgs.RegisterBlockServiceInfo {
func normalBlockServiceInfo(info *msgs.BlockServiceInfo) *msgs.RegisterBlockServiceInfo {
// Everything else is filled in by `initBlockServicesInfo`
return &msgs.RegisterBlockServiceInfo{
CapacityBytes: info.CapacityBytes,
@@ -1182,7 +1182,7 @@ func normalBlockServiceInfo(info *msgs.BlockServiceInfoWithoutFlagsLastChanged)
}
}
func deadBlockServiceInfo(info *msgs.BlockServiceInfoWithoutFlagsLastChanged) *msgs.RegisterBlockServiceInfo {
func deadBlockServiceInfo(info *msgs.BlockServiceInfo) *msgs.RegisterBlockServiceInfo {
// We just replicate everything from the cached one, forever -- but no flags.
return &msgs.RegisterBlockServiceInfo{
Id: info.Id,
@@ -1370,17 +1370,17 @@ func main() {
// erase block requests for old block services safely.
deadBlockServices := make(map[msgs.BlockServiceId]deadBlockService)
{
var shuckleBlockServices []msgs.BlockServiceInfoWithoutFlagsLastChanged
var shuckleBlockServices []msgs.BlockServiceInfo
{
alert := log.NewNCAlert(0)
log.RaiseNC(alert, "fetching block services")
timeouts := lib.NewReqTimeouts(client.DefaultShuckleTimeout.Initial, client.DefaultShuckleTimeout.Max, 0, client.DefaultShuckleTimeout.Growth, client.DefaultShuckleTimeout.Jitter)
resp, err := client.ShuckleRequest(log, timeouts, *shuckleAddress, &msgs.AllBlockServicesWithoutFlagsLastChangedReq{})
resp, err := client.ShuckleRequest(log, timeouts, *shuckleAddress, &msgs.AllBlockServicesReq{})
if err != nil {
panic(fmt.Errorf("could not request block services from shuckle: %v", err))
}
log.ClearNC(alert)
shuckleBlockServices = resp.(*msgs.AllBlockServicesWithoutFlagsLastChangedResp).BlockServices
shuckleBlockServices = resp.(*msgs.AllBlockServicesResp).BlockServices
}
for i := range shuckleBlockServices {
bs := &shuckleBlockServices[i]
+10 -10
View File
@@ -347,11 +347,11 @@ func main() {
os.Exit(2)
}
log.Info("requesting block services")
blockServicesResp, err := client.ShuckleRequest(log, nil, *shuckleAddress, &msgs.AllBlockServicesWithoutFlagsLastChangedReq{})
blockServicesResp, err := client.ShuckleRequest(log, nil, *shuckleAddress, &msgs.AllBlockServicesReq{})
if err != nil {
panic(err)
}
blockServices := blockServicesResp.(*msgs.AllBlockServicesWithoutFlagsLastChangedResp)
blockServices := blockServicesResp.(*msgs.AllBlockServicesResp)
blockServicesToMigrate := make(map[string]*[]msgs.BlockServiceId) // by failure domain
numBlockServicesToMigrate := 0
for _, bs := range blockServices.BlockServices {
@@ -617,12 +617,12 @@ func main() {
blockReqBlockService := blockReqCmd.Uint64("bs", 0, "Block service")
blockReqFile := blockReqCmd.String("file", "", "")
blockReqRun := func() {
resp, err := client.ShuckleRequest(log, nil, *shuckleAddress, &msgs.AllBlockServicesWithoutFlagsLastChangedReq{})
resp, err := client.ShuckleRequest(log, nil, *shuckleAddress, &msgs.AllBlockServicesReq{})
if err != nil {
panic(err)
}
blockServices := resp.(*msgs.AllBlockServicesWithoutFlagsLastChangedResp)
var blockServiceInfo msgs.BlockServiceInfoWithoutFlagsLastChanged
blockServices := resp.(*msgs.AllBlockServicesResp)
var blockServiceInfo msgs.BlockServiceInfo
for _, bsInfo := range blockServices.BlockServices {
if bsInfo.Id == msgs.BlockServiceId(*blockReqBlockService) {
blockServiceInfo = bsInfo
@@ -654,12 +654,12 @@ func main() {
testBlockWriteBlockService := testBlockWriteCmd.String("bs", "", "Block service. If comma-separated, they'll be written in parallel to the specified ones.")
testBlockWriteSize := testBlockWriteCmd.Uint("size", 0, "Size (must fit in u32)")
testBlockWriteRun := func() {
resp, err := client.ShuckleRequest(log, nil, *shuckleAddress, &msgs.AllBlockServicesWithoutFlagsLastChangedReq{})
resp, err := client.ShuckleRequest(log, nil, *shuckleAddress, &msgs.AllBlockServicesReq{})
if err != nil {
panic(err)
}
blockServices := resp.(*msgs.AllBlockServicesWithoutFlagsLastChangedResp)
bsInfos := []msgs.BlockServiceInfoWithoutFlagsLastChanged{}
blockServices := resp.(*msgs.AllBlockServicesResp)
bsInfos := []msgs.BlockServiceInfo{}
for _, str := range strings.Split(*testBlockWriteBlockService, ",") {
bsId, err := strconv.ParseUint(str, 0, 64)
if err != nil {
@@ -736,11 +736,11 @@ func main() {
}
if *blockserviceFlagsFailureDomain != "" {
log.Info("requesting block services")
blockServicesResp, err := client.ShuckleRequest(log, nil, *shuckleAddress, &msgs.AllBlockServicesWithoutFlagsLastChangedReq{})
blockServicesResp, err := client.ShuckleRequest(log, nil, *shuckleAddress, &msgs.AllBlockServicesReq{})
if err != nil {
panic(err)
}
blockServices := blockServicesResp.(*msgs.AllBlockServicesWithoutFlagsLastChangedResp)
blockServices := blockServicesResp.(*msgs.AllBlockServicesResp)
for _, bs := range blockServices.BlockServices {
if bs.FailureDomain.String() == *blockserviceFlagsFailureDomain {
blockServiceIds = append(blockServiceIds, bs.Id)
+5 -5
View File
@@ -658,11 +658,11 @@ func corruptFiles(
) uint64 {
blockServicesToDataDirs := make(map[msgs.BlockServiceId]string)
{
resp, err := client.ShuckleRequest(log, nil, shuckleAddress, &msgs.AllBlockServicesWithoutFlagsLastChangedReq{})
resp, err := client.ShuckleRequest(log, nil, shuckleAddress, &msgs.AllBlockServicesReq{})
if err != nil {
panic(err)
}
body := resp.(*msgs.AllBlockServicesWithoutFlagsLastChangedResp)
body := resp.(*msgs.AllBlockServicesResp)
for _, block := range body.BlockServices {
blockServicesToDataDirs[block.Id] = block.Path
}
@@ -942,12 +942,12 @@ func fsTestInternal[Id comparable](
panic(err)
}
// check that we have no flash block
blockServicesResp, err := client.ShuckleRequest(log, nil, shuckleAddress, &msgs.AllBlockServicesWithoutFlagsLastChangedReq{})
blockServicesResp, err := client.ShuckleRequest(log, nil, shuckleAddress, &msgs.AllBlockServicesReq{})
if err != nil {
panic(err)
}
blockServices := blockServicesResp.(*msgs.AllBlockServicesWithoutFlagsLastChangedResp)
blockServicesById := make(map[msgs.BlockServiceId]*msgs.BlockServiceInfoWithoutFlagsLastChanged)
blockServices := blockServicesResp.(*msgs.AllBlockServicesResp)
blockServicesById := make(map[msgs.BlockServiceId]*msgs.BlockServiceInfo)
for i := range blockServices.BlockServices {
blockServicesById[blockServices.BlockServices[i].Id] = &blockServices.BlockServices[i]
}