Do not duplicate block services when migrating...

...also add checks so that that never happens in ShardDB
This commit is contained in:
Francesco Mazzoli
2023-03-07 16:05:47 +00:00
parent ffa7c6c5e9
commit 4ef819f4e5
15 changed files with 145 additions and 162 deletions

View File

@@ -43,3 +43,10 @@ std::ostream& operator<<(std::ostream& out, Crc crc) {
out << buf;
return out;
}
std::ostream& operator<<(std::ostream& out, BlockServiceId id) {
char buf[19];
sprintf(buf, "0x%016lx", id.u64);
out << buf;
return out;
}

View File

@@ -223,6 +223,31 @@ struct Crc {
std::ostream& operator<<(std::ostream& out, Crc crc);
struct BlockServiceId {
uint64_t u64;
BlockServiceId(): u64(0) {}
BlockServiceId(uint64_t x): u64(x) {}
void pack(BincodeBuf& buf) const {
buf.packScalar(u64);
}
void unpack(BincodeBuf& buf) {
u64 = buf.unpackScalar<uint64_t>();
}
uint16_t packedSize() const {
return sizeof(u64);
}
bool operator==(BlockServiceId other) const {
return u64 == other.u64;
}
};
std::ostream& operator<<(std::ostream& out, BlockServiceId crc);
#include "MsgsGen.hpp"
// We often use this as a optional<EggsError>;

View File

@@ -492,7 +492,7 @@ void BlockInfo::pack(BincodeBuf& buf) const {
buf.packScalar<uint16_t>(blockServicePort1);
buf.packFixedBytes<4>(blockServiceIp2);
buf.packScalar<uint16_t>(blockServicePort2);
buf.packScalar<uint64_t>(blockServiceId);
blockServiceId.pack(buf);
buf.packScalar<uint64_t>(blockId);
buf.packFixedBytes<8>(certificate);
}
@@ -501,7 +501,7 @@ void BlockInfo::unpack(BincodeBuf& buf) {
blockServicePort1 = buf.unpackScalar<uint16_t>();
buf.unpackFixedBytes<4>(blockServiceIp2);
blockServicePort2 = buf.unpackScalar<uint16_t>();
blockServiceId = buf.unpackScalar<uint64_t>();
blockServiceId.unpack(buf);
blockId = buf.unpackScalar<uint64_t>();
buf.unpackFixedBytes<8>(certificate);
}
@@ -510,7 +510,7 @@ void BlockInfo::clear() {
blockServicePort1 = uint16_t(0);
blockServiceIp2.clear();
blockServicePort2 = uint16_t(0);
blockServiceId = uint64_t(0);
blockServiceId = BlockServiceId(0);
blockId = uint64_t(0);
certificate.clear();
}
@@ -519,7 +519,7 @@ bool BlockInfo::operator==(const BlockInfo& rhs) const {
if ((uint16_t)this->blockServicePort1 != (uint16_t)rhs.blockServicePort1) { return false; };
if (blockServiceIp2 != rhs.blockServiceIp2) { return false; };
if ((uint16_t)this->blockServicePort2 != (uint16_t)rhs.blockServicePort2) { return false; };
if ((uint64_t)this->blockServiceId != (uint64_t)rhs.blockServiceId) { return false; };
if ((BlockServiceId)this->blockServiceId != (BlockServiceId)rhs.blockServiceId) { return false; };
if ((uint64_t)this->blockId != (uint64_t)rhs.blockId) { return false; };
if (certificate != rhs.certificate) { return false; };
return true;
@@ -673,30 +673,12 @@ std::ostream& operator<<(std::ostream& out, const DirectoryInfo& x) {
return out;
}
void BlockServiceBlacklist::pack(BincodeBuf& buf) const {
buf.packScalar<uint64_t>(id);
}
void BlockServiceBlacklist::unpack(BincodeBuf& buf) {
id = buf.unpackScalar<uint64_t>();
}
void BlockServiceBlacklist::clear() {
id = uint64_t(0);
}
bool BlockServiceBlacklist::operator==(const BlockServiceBlacklist& rhs) const {
if ((uint64_t)this->id != (uint64_t)rhs.id) { return false; };
return true;
}
std::ostream& operator<<(std::ostream& out, const BlockServiceBlacklist& x) {
out << "BlockServiceBlacklist(" << "Id=" << x.id << ")";
return out;
}
void BlockService::pack(BincodeBuf& buf) const {
buf.packFixedBytes<4>(ip1);
buf.packScalar<uint16_t>(port1);
buf.packFixedBytes<4>(ip2);
buf.packScalar<uint16_t>(port2);
buf.packScalar<uint64_t>(id);
id.pack(buf);
buf.packScalar<uint8_t>(flags);
}
void BlockService::unpack(BincodeBuf& buf) {
@@ -704,7 +686,7 @@ void BlockService::unpack(BincodeBuf& buf) {
port1 = buf.unpackScalar<uint16_t>();
buf.unpackFixedBytes<4>(ip2);
port2 = buf.unpackScalar<uint16_t>();
id = buf.unpackScalar<uint64_t>();
id.unpack(buf);
flags = buf.unpackScalar<uint8_t>();
}
void BlockService::clear() {
@@ -712,7 +694,7 @@ void BlockService::clear() {
port1 = uint16_t(0);
ip2.clear();
port2 = uint16_t(0);
id = uint64_t(0);
id = BlockServiceId(0);
flags = uint8_t(0);
}
bool BlockService::operator==(const BlockService& rhs) const {
@@ -720,7 +702,7 @@ bool BlockService::operator==(const BlockService& rhs) const {
if ((uint16_t)this->port1 != (uint16_t)rhs.port1) { return false; };
if (ip2 != rhs.ip2) { return false; };
if ((uint16_t)this->port2 != (uint16_t)rhs.port2) { return false; };
if ((uint64_t)this->id != (uint64_t)rhs.id) { return false; };
if ((BlockServiceId)this->id != (BlockServiceId)rhs.id) { return false; };
if ((uint8_t)this->flags != (uint8_t)rhs.flags) { return false; };
return true;
}
@@ -760,19 +742,19 @@ std::ostream& operator<<(std::ostream& out, const FullReadDirCursor& x) {
}
void EntryNewBlockInfo::pack(BincodeBuf& buf) const {
buf.packScalar<uint64_t>(blockServiceId);
blockServiceId.pack(buf);
crc.pack(buf);
}
void EntryNewBlockInfo::unpack(BincodeBuf& buf) {
blockServiceId = buf.unpackScalar<uint64_t>();
blockServiceId.unpack(buf);
crc.unpack(buf);
}
void EntryNewBlockInfo::clear() {
blockServiceId = uint64_t(0);
blockServiceId = BlockServiceId(0);
crc = Crc(0);
}
bool EntryNewBlockInfo::operator==(const EntryNewBlockInfo& rhs) const {
if ((uint64_t)this->blockServiceId != (uint64_t)rhs.blockServiceId) { return false; };
if ((BlockServiceId)this->blockServiceId != (BlockServiceId)rhs.blockServiceId) { return false; };
if ((Crc)this->crc != (Crc)rhs.crc) { return false; };
return true;
}
@@ -804,7 +786,7 @@ std::ostream& operator<<(std::ostream& out, const SnapshotLookupEdge& x) {
}
void BlockServiceInfo::pack(BincodeBuf& buf) const {
buf.packScalar<uint64_t>(id);
id.pack(buf);
buf.packFixedBytes<4>(ip1);
buf.packScalar<uint16_t>(port1);
buf.packFixedBytes<4>(ip2);
@@ -819,7 +801,7 @@ void BlockServiceInfo::pack(BincodeBuf& buf) const {
lastSeen.pack(buf);
}
void BlockServiceInfo::unpack(BincodeBuf& buf) {
id = buf.unpackScalar<uint64_t>();
id.unpack(buf);
buf.unpackFixedBytes<4>(ip1);
port1 = buf.unpackScalar<uint16_t>();
buf.unpackFixedBytes<4>(ip2);
@@ -834,7 +816,7 @@ void BlockServiceInfo::unpack(BincodeBuf& buf) {
lastSeen.unpack(buf);
}
void BlockServiceInfo::clear() {
id = uint64_t(0);
id = BlockServiceId(0);
ip1.clear();
port1 = uint16_t(0);
ip2.clear();
@@ -849,7 +831,7 @@ void BlockServiceInfo::clear() {
lastSeen = EggsTime();
}
bool BlockServiceInfo::operator==(const BlockServiceInfo& rhs) const {
if ((uint64_t)this->id != (uint64_t)rhs.id) { return false; };
if ((BlockServiceId)this->id != (BlockServiceId)rhs.id) { return false; };
if (ip1 != rhs.ip1) { return false; };
if ((uint16_t)this->port1 != (uint16_t)rhs.port1) { return false; };
if (ip2 != rhs.ip2) { return false; };
@@ -1304,7 +1286,7 @@ void AddSpanInitiateReq::pack(BincodeBuf& buf) const {
buf.packScalar<uint32_t>(size);
crc.pack(buf);
buf.packScalar<uint8_t>(storageClass);
buf.packList<BlockServiceBlacklist>(blacklist);
buf.packList<BlockServiceId>(blacklist);
parity.pack(buf);
buf.packScalar<uint8_t>(stripes);
buf.packScalar<uint32_t>(cellSize);
@@ -1317,7 +1299,7 @@ void AddSpanInitiateReq::unpack(BincodeBuf& buf) {
size = buf.unpackScalar<uint32_t>();
crc.unpack(buf);
storageClass = buf.unpackScalar<uint8_t>();
buf.unpackList<BlockServiceBlacklist>(blacklist);
buf.unpackList<BlockServiceId>(blacklist);
parity.unpack(buf);
stripes = buf.unpackScalar<uint8_t>();
cellSize = buf.unpackScalar<uint32_t>();
@@ -2174,19 +2156,19 @@ std::ostream& operator<<(std::ostream& out, const SwapBlocksResp& x) {
}
void BlockServiceFilesReq::pack(BincodeBuf& buf) const {
buf.packScalar<uint64_t>(blockServiceId);
blockServiceId.pack(buf);
startFrom.pack(buf);
}
void BlockServiceFilesReq::unpack(BincodeBuf& buf) {
blockServiceId = buf.unpackScalar<uint64_t>();
blockServiceId.unpack(buf);
startFrom.unpack(buf);
}
void BlockServiceFilesReq::clear() {
blockServiceId = uint64_t(0);
blockServiceId = BlockServiceId(0);
startFrom = InodeId();
}
bool BlockServiceFilesReq::operator==(const BlockServiceFilesReq& rhs) const {
if ((uint64_t)this->blockServiceId != (uint64_t)rhs.blockServiceId) { return false; };
if ((BlockServiceId)this->blockServiceId != (BlockServiceId)rhs.blockServiceId) { return false; };
if ((InodeId)this->startFrom != (InodeId)rhs.startFrom) { return false; };
return true;
}

View File

@@ -243,7 +243,7 @@ struct BlockInfo {
uint16_t blockServicePort1;
BincodeFixedBytes<4> blockServiceIp2;
uint16_t blockServicePort2;
uint64_t blockServiceId;
BlockServiceId blockServiceId;
uint64_t blockId;
BincodeFixedBytes<8> certificate;
@@ -509,31 +509,12 @@ struct DirectoryInfo {
std::ostream& operator<<(std::ostream& out, const DirectoryInfo& x);
struct BlockServiceBlacklist {
uint64_t id;
static constexpr uint16_t STATIC_SIZE = 8; // id
BlockServiceBlacklist() { clear(); }
uint16_t packedSize() const {
uint16_t _size = 0;
_size += 8; // id
return _size;
}
void pack(BincodeBuf& buf) const;
void unpack(BincodeBuf& buf);
void clear();
bool operator==(const BlockServiceBlacklist&rhs) const;
};
std::ostream& operator<<(std::ostream& out, const BlockServiceBlacklist& x);
struct BlockService {
BincodeFixedBytes<4> ip1;
uint16_t port1;
BincodeFixedBytes<4> ip2;
uint16_t port2;
uint64_t id;
BlockServiceId id;
uint8_t flags;
static constexpr uint16_t STATIC_SIZE = BincodeFixedBytes<4>::STATIC_SIZE + 2 + BincodeFixedBytes<4>::STATIC_SIZE + 2 + 8 + 1; // ip1 + port1 + ip2 + port2 + id + flags
@@ -583,7 +564,7 @@ struct FullReadDirCursor {
std::ostream& operator<<(std::ostream& out, const FullReadDirCursor& x);
struct EntryNewBlockInfo {
uint64_t blockServiceId;
BlockServiceId blockServiceId;
Crc crc;
static constexpr uint16_t STATIC_SIZE = 8 + 4; // blockServiceId + crc
@@ -625,7 +606,7 @@ struct SnapshotLookupEdge {
std::ostream& operator<<(std::ostream& out, const SnapshotLookupEdge& x);
struct BlockServiceInfo {
uint64_t id;
BlockServiceId id;
BincodeFixedBytes<4> ip1;
uint16_t port1;
BincodeFixedBytes<4> ip2;
@@ -1088,13 +1069,13 @@ struct AddSpanInitiateReq {
uint32_t size;
Crc crc;
uint8_t storageClass;
BincodeList<BlockServiceBlacklist> blacklist;
BincodeList<BlockServiceId> blacklist;
Parity parity;
uint8_t stripes;
uint32_t cellSize;
BincodeList<Crc> crcs;
static constexpr uint16_t STATIC_SIZE = 8 + BincodeFixedBytes<8>::STATIC_SIZE + 8 + 4 + 4 + 1 + BincodeList<BlockServiceBlacklist>::STATIC_SIZE + 1 + 1 + 4 + BincodeList<Crc>::STATIC_SIZE; // fileId + cookie + byteOffset + size + crc + storageClass + blacklist + parity + stripes + cellSize + crcs
static constexpr uint16_t STATIC_SIZE = 8 + BincodeFixedBytes<8>::STATIC_SIZE + 8 + 4 + 4 + 1 + BincodeList<BlockServiceId>::STATIC_SIZE + 1 + 1 + 4 + BincodeList<Crc>::STATIC_SIZE; // fileId + cookie + byteOffset + size + crc + storageClass + blacklist + parity + stripes + cellSize + crcs
AddSpanInitiateReq() { clear(); }
uint16_t packedSize() const {
@@ -1900,7 +1881,7 @@ struct SwapBlocksResp {
std::ostream& operator<<(std::ostream& out, const SwapBlocksResp& x);
struct BlockServiceFilesReq {
uint64_t blockServiceId;
BlockServiceId blockServiceId;
InodeId startFrom;
static constexpr uint16_t STATIC_SIZE = 8 + 8; // blockServiceId + startFrom

View File

@@ -252,19 +252,4 @@ uint32_t crc32c_xor(uint32_t crc_a, uint32_t crc_b, size_t len) {
// table, but probably doesn't matter.
uint32_t crc_0 = ~crc32c_mult_mod_p(~(uint32_t)0, x2n_mod_p(len, 3));
return crc_a ^ crc_b ^ crc_0;
}
void crc32c_xor_bytes(uint8_t* bytes_a, const uint8_t* bytes_b, size_t len) {
size_t leftover = len%32;
for (ssize_t i = 0; i < len-leftover; i += 32) {
_mm256_storeu_si256(
(__m256i*)bytes_a,
_mm256_xor_si256(_mm256_lddqu_si256((const __m256i*)bytes_a), _mm256_lddqu_si256((const __m256i*)bytes_b))
);
bytes_a += 32;
bytes_b += 32;
}
for (size_t i = 0; i < leftover; i++) {
bytes_a[i] ^= bytes_b[i];
}
}
}

View File

@@ -698,7 +698,7 @@ struct ShardDBImpl {
int budget = UDP_MTU - ShardResponseHeader::STATIC_SIZE - FileSpansResp::STATIC_SIZE;
// if -1, we ran out of budget.
const auto addBlockService = [this, &resp, &budget](uint64_t blockServiceId) -> int {
const auto addBlockService = [this, &resp, &budget](BlockServiceId blockServiceId) -> int {
// See if we've placed it already
for (int i = 0; i < resp.blockServices.els.size(); i++) {
if (resp.blockServices.els.at(i).id == blockServiceId) {
@@ -711,7 +711,7 @@ struct ShardDBImpl {
return -1;
}
auto& blockService = resp.blockServices.els.emplace_back();
const auto& cache = _blockServicesCache.at(blockServiceId);
const auto& cache = _blockServicesCache.at(blockServiceId.u64);
blockService.id = blockServiceId;
blockService.ip1 = cache.ip1;
blockService.port1 = cache.port1;
@@ -793,10 +793,10 @@ struct ShardDBImpl {
resp.fileIds.els.reserve(maxFiles);
StaticValue<BlockServiceToFileKey> beginKey;
beginKey().setBlockServiceId(req.blockServiceId);
beginKey().setBlockServiceId(req.blockServiceId.u64);
beginKey().setFileId(req.startFrom);
StaticValue<BlockServiceToFileKey> endKey;
endKey().setBlockServiceId(req.blockServiceId+1);
endKey().setBlockServiceId(req.blockServiceId.u64+1);
endKey().setFileId(NULL_INODE_ID);
auto endKeySlice = endKey.toSlice();
@@ -1220,9 +1220,9 @@ struct ShardDBImpl {
return true;
}
bool _blockServiceMatchesBlacklist(const std::vector<BlockServiceBlacklist>& blacklists, uint64_t blockServiceId, const BlockServiceCache& cache) {
bool _blockServiceMatchesBlacklist(const std::vector<BlockServiceId>& blacklists, BlockServiceId blockServiceId, const BlockServiceCache& cache) {
for (const auto& blacklist: blacklists) {
if (blacklist.id == blockServiceId) {
if (blacklist == blockServiceId) {
return true;
}
}
@@ -1320,11 +1320,11 @@ struct ShardDBImpl {
//
// TODO factor failure domains in the decision
{
std::vector<uint64_t> candidateBlockServices;
std::vector<BlockServiceId> candidateBlockServices;
candidateBlockServices.reserve(_currentBlockServices.size());
LOG_DEBUG(_env, "Starting out with %s current block services", _currentBlockServices.size());
for (uint64_t id: _currentBlockServices) {
const auto& cache = _blockServicesCache.at(id);
for (BlockServiceId id: _currentBlockServices) {
const auto& cache = _blockServicesCache.at(id.u64);
if (cache.storageClass != entry.storageClass) {
LOG_DEBUG(_env, "Skipping %s because of different storage class (%s != %s)", id, (int)cache.storageClass, (int)entry.storageClass);
continue;
@@ -1336,7 +1336,7 @@ struct ShardDBImpl {
candidateBlockServices.emplace_back(id);
}
LOG_DEBUG(_env, "Starting out with %s block service candidates, parity %s", candidateBlockServices.size(), entry.parity);
std::vector<uint64_t> pickedBlockServices;
std::vector<BlockServiceId> pickedBlockServices;
pickedBlockServices.reserve(req.parity.blocks());
// Try to get the first span to copy its block services -- this should be the
// very common case past the first span.
@@ -1367,7 +1367,7 @@ struct ShardDBImpl {
continue;
}
LOG_DEBUG(_env, "(1) Picking block service candidate %s", spanBlock.blockService());
uint64_t blockServiceId = spanBlock.blockService();
BlockServiceId blockServiceId = spanBlock.blockService();
pickedBlockServices.emplace_back(blockServiceId);
std::iter_swap(isCandidate, candidateBlockServices.end()-1);
candidateBlockServices.pop_back();
@@ -2429,7 +2429,7 @@ struct ShardDBImpl {
resp.blocks.els.reserve(blocks.parity().blocks());
for (int i = 0; i < blocks.parity().blocks(); i++) {
const auto block = blocks.block(i);
const auto& cache = _blockServicesCache.at(block.blockService());
const auto& cache = _blockServicesCache.at(block.blockService().u64);
auto& respBlock = resp.blocks.els.emplace_back();
respBlock.blockServiceIp1 = cache.ip1;
respBlock.blockServicePort1 = cache.port1;
@@ -2460,10 +2460,10 @@ struct ShardDBImpl {
StaticValue<BlockServiceBody> blockBody;
for (int i = 0; i < entry.blockServices.els.size(); i++) {
const auto& entryBlock = entry.blockServices.els[i];
currentBody().set(i, entryBlock.id);
_currentBlockServices[i] = entryBlock.id;
blockKey().setBlockServiceId(entryBlock.id);
blockBody().setId(entryBlock.id);
currentBody().set(i, entryBlock.id.u64);
_currentBlockServices[i] = entryBlock.id.u64;
blockKey().setBlockServiceId(entryBlock.id.u64);
blockBody().setId(entryBlock.id.u64);
blockBody().setIp1(entryBlock.ip1.data);
blockBody().setPort1(entryBlock.port1);
blockBody().setIp2(entryBlock.ip2.data);
@@ -2472,7 +2472,7 @@ struct ShardDBImpl {
blockBody().setFailureDomain(entryBlock.failureDomain.data);
blockBody().setSecretKey(entryBlock.secretKey.data);
ROCKS_DB_CHECKED(batch.Put(_defaultCf, blockKey.toSlice(), blockBody.toSlice()));
auto& cache = _blockServicesCache[entryBlock.id];
auto& cache = _blockServicesCache[entryBlock.id.u64];
expandKey(entryBlock.secretKey.data, cache.secretKey);
cache.ip1 = entryBlock.ip1.data;
cache.port1 = entryBlock.port1;
@@ -2509,7 +2509,7 @@ struct ShardDBImpl {
auto& respBlock = resp.blocks.els.emplace_back();
respBlock.blockServiceId = block.blockService();
respBlock.blockId = block.blockId();
const auto& cache = _blockServicesCache.at(block.blockService());
const auto& cache = _blockServicesCache.at(block.blockService().u64);
respBlock.blockServiceIp1 = cache.ip1;
respBlock.blockServicePort1 = cache.port1;
respBlock.blockServiceIp2 = cache.ip2;
@@ -2518,7 +2518,7 @@ struct ShardDBImpl {
}
}
void _addBlockServicesToFiles(rocksdb::WriteBatch& batch, uint64_t blockServiceId, InodeId fileId, int64_t delta) {
void _addBlockServicesToFiles(rocksdb::WriteBatch& batch, BlockServiceId blockServiceId, InodeId fileId, int64_t delta) {
StaticValue<BlockServiceToFileKey> k;
k().setBlockServiceId(blockServiceId);
k().setFileId(fileId);
@@ -2688,9 +2688,9 @@ struct ShardDBImpl {
const auto& entryBlock = entry.bodyBlocks.els[i];
auto block = blocks.block(i);
block.setBlockId(_updateNextBlockId(time, nextBlockId));
block.setBlockService(entryBlock.blockServiceId);
block.setBlockService(entryBlock.blockServiceId.u64);
block.setCrc(entryBlock.crc.u32);
_addBlockServicesToFiles(batch, entryBlock.blockServiceId, entry.fileId, 1);
_addBlockServicesToFiles(batch, entryBlock.blockServiceId.u64, entry.fileId, 1);
}
_writeNextBlockId(batch, nextBlockId);
for (int i = 0; i < entry.stripes; i++) {
@@ -2710,7 +2710,7 @@ struct ShardDBImpl {
memset(buf, 0, sizeof(buf));
BincodeBuf bbuf(buf, sizeof(buf));
// struct.pack_into('<QcQ4sI', b, 0, block['block_service_id'], b'w', block['block_id'], crc32_from_int(block['crc32']), block_size)
bbuf.packScalar<uint64_t>(block.blockService());
bbuf.packScalar<uint64_t>(block.blockService().u64);
bbuf.packScalar<char>('w');
bbuf.packScalar<uint64_t>(block.blockId());
bbuf.packScalar<uint32_t>(block.crc());
@@ -2719,16 +2719,16 @@ struct ShardDBImpl {
return cbcmac(secretKey, (uint8_t*)buf, sizeof(buf));
}
bool _checkBlockAddProof(uint64_t blockServiceId, const BlockProof& proof) {
bool _checkBlockAddProof(BlockServiceId blockServiceId, const BlockProof& proof) {
char buf[32];
memset(buf, 0, sizeof(buf));
BincodeBuf bbuf(buf, sizeof(buf));
// struct.pack_into('<QcQ', b, 0, block_service_id, b'W', block_id)
bbuf.packScalar<uint64_t>(blockServiceId);
bbuf.packScalar<uint64_t>(blockServiceId.u64);
bbuf.packScalar<char>('W');
bbuf.packScalar<uint64_t>(proof.blockId);
const auto& cache = _blockServicesCache.at(blockServiceId);
const auto& cache = _blockServicesCache.at(blockServiceId.u64);
auto expectedProof = cbcmac(cache.secretKey, (uint8_t*)buf, sizeof(buf));
return proof.proof == expectedProof;
@@ -2739,23 +2739,23 @@ struct ShardDBImpl {
memset(buf, 0, sizeof(buf));
BincodeBuf bbuf(buf, sizeof(buf));
// struct.pack_into('<QcQ', b, 0, block['block_service_id'], b'e', block['block_id'])
bbuf.packScalar<uint64_t>(block.blockService());
bbuf.packScalar<uint64_t>(block.blockService().u64);
bbuf.packScalar<char>('e');
bbuf.packScalar<uint64_t>(block.blockId());
return cbcmac(secretKey, (uint8_t*)buf, sizeof(buf));
}
bool _checkBlockDeleteProof(uint64_t blockServiceId, const BlockProof& proof) {
bool _checkBlockDeleteProof(BlockServiceId blockServiceId, const BlockProof& proof) {
char buf[32];
memset(buf, 0, sizeof(buf));
BincodeBuf bbuf(buf, sizeof(buf));
// struct.pack_into('<QcQ', b, 0, block['block_service_id'], b'E', block['block_id'])
bbuf.packScalar<uint64_t>(blockServiceId);
bbuf.packScalar<uint64_t>(blockServiceId.u64);
bbuf.packScalar<char>('E');
bbuf.packScalar<uint64_t>(proof.blockId);
const auto& cache = _blockServicesCache.at(blockServiceId);
const auto& cache = _blockServicesCache.at(blockServiceId.u64);
auto expectedProof = cbcmac(cache.secretKey, (uint8_t*)buf, sizeof(buf));
bool good = proof.proof == expectedProof;
@@ -3034,9 +3034,19 @@ struct ShardDBImpl {
}
throw EGGS_EXCEPTION("blocks not found");
}
if (block1.crc() != block2.crc()) {
throw EGGS_EXCEPTION("mismatching blocks CRCs");
}
ALWAYS_ASSERT(block1.crc() == block2.crc());
// Check that we're not creating a situation where we have two blocks in the same block service
const auto checkNoDuplicateBlockServices = [](const auto blocks, int blockToBeReplacedIx, const auto newBlock) {
for (int i = 0; i < blocks.parity().blocks(); i++) {
if (i == blockToBeReplacedIx) {
continue;
}
const auto block = blocks.block(i);
ALWAYS_ASSERT(block.blockService() != newBlock.blockService());
}
};
checkNoDuplicateBlockServices(blocks1, block1Ix, block2);
checkNoDuplicateBlockServices(blocks2, block2Ix, block1);
// Record the block counts
_addBlockServicesToFiles(batch, block1.blockService(), entry.fileId1, -1);
_addBlockServicesToFiles(batch, block2.blockService(), entry.fileId1, +1);

View File

@@ -192,9 +192,9 @@ struct BlockBody {
sizeof(uint64_t) +
sizeof(uint32_t);
LE_VAL(uint64_t, blockService, setBlockService, 0);
LE_VAL(uint64_t, blockId, setBlockId, 8);
LE_VAL(uint32_t, crc, setCrc, 16);
LE_VAL(BlockServiceId, blockService, setBlockService, 0);
LE_VAL(uint64_t, blockId, setBlockId, 8);
LE_VAL(uint32_t, crc, setCrc, 16);
};
struct SpanBlocksBody {
@@ -475,6 +475,6 @@ struct BlockServiceToFileKey {
size_t size() const { return MAX_SIZE; }
void checkSize(size_t sz) { ALWAYS_ASSERT(sz == MAX_SIZE); }
BE64_VAL(uint64_t, blockServiceId, setBlockServiceId, 0)
BE64_VAL(InodeId, fileId, setFileId, 8)
BE64_VAL(BlockServiceId, blockServiceId, setBlockServiceId, 0)
BE64_VAL(InodeId, fileId, setFileId, 8)
};