Add endpoint to specify which file to get the "reference" block services from

See comments for more details.
This commit is contained in:
Francesco Mazzoli
2023-08-14 19:40:02 +00:00
committed by Francesco Mazzoli
parent 3c65593d1f
commit 40f229b6f5
13 changed files with 505 additions and 255 deletions

View File

@@ -1435,10 +1435,13 @@ struct ShardDBImpl {
return NO_ERROR;
}
EggsError _prepareAddSpanInitiate(EggsTime time, const AddSpanInitiateReq& req, AddSpanInitiateEntry& entry) {
EggsError _prepareAddSpanInitiate(EggsTime time, const AddSpanInitiateReq& req, InodeId reference, AddSpanInitiateEntry& entry) {
if (req.fileId.type() != InodeType::FILE && req.fileId.type() != InodeType::SYMLINK) {
return EggsError::TYPE_IS_DIRECTORY;
}
if (reference.type() != InodeType::FILE && reference.type() != InodeType::SYMLINK) {
return EggsError::TYPE_IS_DIRECTORY;
}
if (req.fileId.shard() != _shid) {
return EggsError::BAD_SHARD;
}
@@ -1505,43 +1508,59 @@ struct ShardDBImpl {
LOG_DEBUG(_env, "Starting out with %s block service candidates, parity %s", candidateBlockServices.size(), entry.parity);
std::vector<BlockServiceId> pickedBlockServices;
pickedBlockServices.reserve(req.parity.blocks());
// Try to get the first span to copy its block services -- this should be the
// very common case past the first span.
{
StaticValue<SpanKey> k;
k().setFileId(req.fileId);
k().setOffset(0);
std::string v;
auto status = _db->Get({}, _spansCf, k.toSlice(), &v);
if (status.IsNotFound()) {
// no-op -- we'll just generate them all at random
} else {
ROCKS_DB_CHECKED(status);
ExternalValue<SpanBody> span(v);
// TODO this means that if the first span is inline or smaller, all the other
// spans will get block services generated at random, which is not ideal. We
// should probably look further.
if (span().storageClass() != INLINE_STORAGE) {
const auto blocks = span().blocksBody();
for (
int i = 0;
i < blocks.parity().blocks() && pickedBlockServices.size() < req.parity.blocks() && candidateBlockServices.size() > 0;
i++
) {
const BlockBody spanBlock = blocks.block(i);
auto isCandidate = std::find(candidateBlockServices.begin(), candidateBlockServices.end(), spanBlock.blockService());
if (isCandidate == candidateBlockServices.end()) {
continue;
}
LOG_DEBUG(_env, "(1) Picking block service candidate %s, failure domain %s", spanBlock.blockService(), GoLangQuotedStringFmt((const char*)_blockServicesCache.at(spanBlock.blockService().u64).failureDomain.data(), 16));
BlockServiceId blockServiceId = spanBlock.blockService();
pickedBlockServices.emplace_back(blockServiceId);
std::iter_swap(isCandidate, candidateBlockServices.end()-1);
candidateBlockServices.pop_back();
}
}
// We try to copy the block services from the first and the last span. The first
// span is generally considered the "reference" one, and should work in the common
// case. The last span is useful only in the case where we start using different
// block services mid-file, probably because a block service went down. Why not
// always use the last span? When we migrate or defrag or in generally reorganize
// the spans we generally work from left-to-right, and in that case if we always
// looked at the last one we'd pick a random block service every time. The "last
// span" fallback is free in the common case anyhow.
const auto fillInBlockServicesFromSpan = [&](bool first) {
// empty file, bail out early and avoid pointless double lookup
if (entry.fileId == reference && entry.byteOffset == 0) {
return;
}
}
// we're already done (avoid double seek in the common case)
if (pickedBlockServices.size() >= req.parity.blocks() || candidateBlockServices.size() < 0) {
return;
}
StaticValue<SpanKey> startK;
startK().setFileId(reference);
// We should never have many tombstones here (spans aren't really deleted and
// re-added apart from rare cases), so the offset upper bound is fine.
startK().setOffset(first ? 0 : ~(uint64_t)0);
std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator({}, _spansCf));
it->SeekForPrev(startK.toSlice());
if (!it->Valid()) { // nothing to do if we can't find a span
if (!it->status().IsNotFound()) {
ROCKS_DB_CHECKED(it->status());
}
return;
}
auto k = ExternalValue<SpanKey>::FromSlice(it->key());
auto span = ExternalValue<SpanBody>::FromSlice(it->value());
if (span().storageClass() == INLINE_STORAGE) { return; }
const auto blocks = span().blocksBody();
for (
int i = 0;
i < blocks.parity().blocks() && pickedBlockServices.size() < req.parity.blocks() && candidateBlockServices.size() > 0;
i++
) {
const BlockBody spanBlock = blocks.block(i);
auto isCandidate = std::find(candidateBlockServices.begin(), candidateBlockServices.end(), spanBlock.blockService());
if (isCandidate == candidateBlockServices.end()) {
continue;
}
LOG_DEBUG(_env, "(1) Picking block service candidate %s, failure domain %s", spanBlock.blockService(), GoLangQuotedStringFmt((const char*)_blockServicesCache.at(spanBlock.blockService().u64).failureDomain.data(), 16));
BlockServiceId blockServiceId = spanBlock.blockService();
pickedBlockServices.emplace_back(blockServiceId);
std::iter_swap(isCandidate, candidateBlockServices.end()-1);
candidateBlockServices.pop_back();
}
};
fillInBlockServicesFromSpan(true);
fillInBlockServicesFromSpan(false);
// Fill in whatever remains. We don't need to be deterministic here (we would have to
// if we were in log application), but we might as well.
{
@@ -1754,9 +1773,14 @@ struct ShardDBImpl {
case ShardMessageKind::ADD_INLINE_SPAN:
err = _prepareAddInlineSpan(time, req.getAddInlineSpan(), logEntryBody.setAddInlineSpan());
break;
case ShardMessageKind::ADD_SPAN_INITIATE:
err = _prepareAddSpanInitiate(time, req.getAddSpanInitiate(), logEntryBody.setAddSpanInitiate());
break;
case ShardMessageKind::ADD_SPAN_INITIATE: {
const auto& addSpanReq = req.getAddSpanInitiate();
err = _prepareAddSpanInitiate(time, addSpanReq, addSpanReq.fileId, logEntryBody.setAddSpanInitiate());
break; }
case ShardMessageKind::ADD_SPAN_INITIATE_WITH_REFERENCE: {
const auto& addSpanReq = req.getAddSpanInitiateWithReference();
err = _prepareAddSpanInitiate(time, addSpanReq.req, addSpanReq.reference, logEntryBody.setAddSpanInitiate());
break; }
case ShardMessageKind::ADD_SPAN_CERTIFY:
err = _prepareAddSpanCertify(time, req.getAddSpanCertify(), logEntryBody.setAddSpanCertify());
break;
@@ -3431,7 +3455,7 @@ struct ShardDBImpl {
return NO_ERROR;
}
EggsError applyLogEntry(bool sync, uint64_t logIndex, const ShardLogEntry& logEntry, ShardRespContainer& resp) {
EggsError applyLogEntry(bool sync, ShardMessageKind reqKind, uint64_t logIndex, const ShardLogEntry& logEntry, ShardRespContainer& resp) {
// TODO figure out the story with what regards time monotonicity (possibly drop non-monotonic log
// updates?)
@@ -3509,9 +3533,15 @@ struct ShardDBImpl {
case ShardLogEntryKind::ADD_INLINE_SPAN:
err = _applyAddInlineSpan(time, batch, logEntryBody.getAddInlineSpan(), resp.setAddInlineSpan());
break;
case ShardLogEntryKind::ADD_SPAN_INITIATE:
err = _applyAddSpanInitiate(time, batch, logEntryBody.getAddSpanInitiate(), resp.setAddSpanInitiate());
break;
case ShardLogEntryKind::ADD_SPAN_INITIATE: {
if (reqKind == ShardMessageKind::ADD_SPAN_INITIATE) {
err = _applyAddSpanInitiate(time, batch, logEntryBody.getAddSpanInitiate(), resp.setAddSpanInitiate());
} else {
ALWAYS_ASSERT(reqKind == ShardMessageKind::ADD_SPAN_INITIATE_WITH_REFERENCE);
auto& refResp = resp.setAddSpanInitiateWithReference();
err = _applyAddSpanInitiate(time, batch, logEntryBody.getAddSpanInitiate(), refResp.resp);
}
break; }
case ShardLogEntryKind::ADD_SPAN_CERTIFY:
err = _applyAddSpanCertify(time, batch, logEntryBody.getAddSpanCertify(), resp.setAddSpanCertify());
break;
@@ -3732,6 +3762,7 @@ bool readOnlyShardReq(const ShardMessageKind kind) {
return true;
case ShardMessageKind::CONSTRUCT_FILE:
case ShardMessageKind::ADD_SPAN_INITIATE:
case ShardMessageKind::ADD_SPAN_INITIATE_WITH_REFERENCE:
case ShardMessageKind::ADD_SPAN_CERTIFY:
case ShardMessageKind::ADD_INLINE_SPAN:
case ShardMessageKind::LINK_FILE:
@@ -3782,8 +3813,8 @@ EggsError ShardDB::prepareLogEntry(const ShardReqContainer& req, ShardLogEntry&
return ((ShardDBImpl*)_impl)->prepareLogEntry(req, logEntry);
}
EggsError ShardDB::applyLogEntry(bool sync, uint64_t logEntryIx, const ShardLogEntry& logEntry, ShardRespContainer& resp) {
return ((ShardDBImpl*)_impl)->applyLogEntry(sync, logEntryIx, logEntry, resp);
EggsError ShardDB::applyLogEntry(bool sync, ShardMessageKind reqKind, uint64_t logEntryIx, const ShardLogEntry& logEntry, ShardRespContainer& resp) {
return ((ShardDBImpl*)_impl)->applyLogEntry(sync, reqKind, logEntryIx, logEntry, resp);
}
uint64_t ShardDB::lastAppliedLogEntry() {