Distribute block services from shuckle

This is in preparation for #44, but more immediately, to better
stop writing to full block services.

The previous strategy of setting a flag was flawed since once
the flag was set it stayed set -- i.e. we would not remove it once
files would be deleted.  This consideration should just be integrated
in distributing the block services.
This commit is contained in:
Francesco Mazzoli
2024-01-16 14:29:01 +00:00
parent c4805a56fe
commit b6cf2b67a6
10 changed files with 315 additions and 49 deletions

View File

@@ -13,6 +13,7 @@
#include <stdio.h>
#include <memory>
#include <netinet/tcp.h>
#include <unordered_set>
#include "Bincode.hpp"
#include "Env.hpp"
@@ -134,20 +135,58 @@ std::string fetchBlockServices(const std::string& addr, uint16_t port, Duration
return errString;
}
ShuckleReqContainer reqContainer;
auto& req = reqContainer.setAllBlockServices();
errString = writeShuckleRequest(sock.fd, reqContainer);
if (!errString.empty()) {
return errString;
// all block services
{
ShuckleReqContainer reqContainer;
auto& req = reqContainer.setAllBlockServices();
errString = writeShuckleRequest(sock.fd, reqContainer);
if (!errString.empty()) {
return errString;
}
ShuckleRespContainer respContainer;
errString = readShuckleResponse(sock.fd, respContainer);
if (!errString.empty()) {
return errString;
}
blocks.blockServices = respContainer.getAllBlockServices().blockServices;
}
ShuckleRespContainer respContainer;
errString = readShuckleResponse(sock.fd, respContainer);
if (!errString.empty()) {
return errString;
// current block services
{
ShuckleReqContainer reqContainer;
auto& req = reqContainer.setShardBlockServices();
req.shardId = shid;
errString = writeShuckleRequest(sock.fd, reqContainer);
if (!errString.empty()) {
return errString;
}
ShuckleRespContainer respContainer;
errString = readShuckleResponse(sock.fd, respContainer);
if (!errString.empty()) {
return errString;
}
blocks.currentBlockServices = respContainer.getShardBlockServices().blockServices;
}
blocks.blockServices = respContainer.getAllBlockServices().blockServices;
// check that all current block services are known -- there's a small race here
// the caller should just retry in these cases.
{
std::unordered_set<uint64_t> knownBlockServices;
for (const auto& bs : blocks.blockServices.els) {
knownBlockServices.insert(bs.id.u64);
}
for (BlockServiceId bsId : blocks.currentBlockServices.els) {
if (!knownBlockServices.contains(bsId.u64)) {
std::stringstream ss;
ss << "got unknown block service " << bsId << " in current block services, was probably added in the meantime, please retry";
return ss.str();
}
}
}
return {};
}