Add "mtu" field to requests that benefit from it

Not used right now, but this way we can easily start stuffing more
data in responses.

I also split off some arguments in `NewClient`, unrelated change
(I wanted to pair the MTU with a single client, but I then realized
that it's enough to have it as some global property for now).
This commit is contained in:
Francesco Mazzoli
2023-06-15 08:46:43 +00:00
parent b840931575
commit e26eeaede1
23 changed files with 265 additions and 69 deletions

View File

@@ -191,6 +191,12 @@ static bool validName(const BincodeBytesRef& name) {
return true;
}
static int pickMtu(uint16_t mtu) {
if (mtu == 0) { mtu = DEFAULT_UDP_MTU; }
mtu = std::min<uint16_t>(MAX_UDP_MTU, mtu);
return mtu;
}
constexpr uint64_t DEADLINE_INTERVAL = (60ull /*mins*/ * 60 /*secs*/ * 1'000'000'000 /*ns*/); // 1 hr
void ShardLogEntry::pack(BincodeBuf& buf) const {
@@ -503,7 +509,7 @@ struct ShardDBImpl {
beginKey().setDirIdWithCurrent(req.dirId, true); // current = true
beginKey().setNameHash(req.startHash);
beginKey().setName({});
int budget = UDP_MTU - ShardResponseHeader::STATIC_SIZE - ReadDirResp::STATIC_SIZE;
int budget = pickMtu(req.mtu) - ShardResponseHeader::STATIC_SIZE - ReadDirResp::STATIC_SIZE;
for (it->Seek(beginKey.toSlice()); it->Valid(); it->Next()) {
auto key = ExternalValue<EdgeKey>::FromSlice(it->key());
if (key().dirId() != req.dirId || !key().current()) {
@@ -551,7 +557,7 @@ struct ShardDBImpl {
} else {
ALWAYS_ASSERT(req.cursor.startTime == 0); // TODO proper error at validation time
}
int budget = UDP_MTU - ShardResponseHeader::STATIC_SIZE - FullReadDirResp::STATIC_SIZE;
int budget = pickMtu(req.mtu) - ShardResponseHeader::STATIC_SIZE - FullReadDirResp::STATIC_SIZE;
for (it->Seek(beginKey.toSlice()); it->Valid(); it->Next()) {
auto key = ExternalValue<EdgeKey>::FromSlice(it->key());
if (key().dirId() != req.dirId) {
@@ -645,7 +651,7 @@ struct ShardDBImpl {
{
std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator({}, _transientCf));
auto beginKey = InodeIdKey::Static(req.beginId);
int budget = UDP_MTU - ShardResponseHeader::STATIC_SIZE - VisitTransientFilesResp::STATIC_SIZE;
int budget = pickMtu(req.mtu) - ShardResponseHeader::STATIC_SIZE - VisitTransientFilesResp::STATIC_SIZE;
for (it->Seek(beginKey.toSlice()); it->Valid(); it->Next()) {
auto id = ExternalValue<InodeIdKey>::FromSlice(it->key());
auto file = ExternalValue<TransientFileBody>::FromSlice(it->value());
@@ -672,7 +678,7 @@ struct ShardDBImpl {
EggsError _visitInodes(rocksdb::ColumnFamilyHandle* cf, const Req& req, Resp& resp) {
resp.nextId = NULL_INODE_ID;
int budget = UDP_MTU - ShardResponseHeader::STATIC_SIZE - Resp::STATIC_SIZE;
int budget = pickMtu(req.mtu) - ShardResponseHeader::STATIC_SIZE - Resp::STATIC_SIZE;
int maxIds = (budget/8) + 1; // include next inode
{
std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator({}, cf));
@@ -707,7 +713,7 @@ struct ShardDBImpl {
rocksdb::ReadOptions options;
options.snapshot = snapshot.get();
int budget = UDP_MTU - ShardResponseHeader::STATIC_SIZE - FileSpansResp::STATIC_SIZE;
int budget = pickMtu(req.mtu) - ShardResponseHeader::STATIC_SIZE - FileSpansResp::STATIC_SIZE;
// if -1, we ran out of budget.
const auto addBlockService = [this, &resp, &budget](BlockServiceId blockServiceId) -> int {
// See if we've placed it already
@@ -805,7 +811,7 @@ struct ShardDBImpl {
}
EggsError _blockServiceFiles(const BlockServiceFilesReq& req, BlockServiceFilesResp& resp) {
int maxFiles = (UDP_MTU - ShardResponseHeader::STATIC_SIZE - BlockServiceFilesResp::STATIC_SIZE) / 8;
int maxFiles = (DEFAULT_UDP_MTU - ShardResponseHeader::STATIC_SIZE - BlockServiceFilesResp::STATIC_SIZE) / 8;
resp.fileIds.els.reserve(maxFiles);
StaticValue<BlockServiceToFileKey> beginKey;
@@ -854,7 +860,7 @@ struct ShardDBImpl {
}
}
int maxEdges = 1 + (UDP_MTU - ShardResponseHeader::STATIC_SIZE - SnapshotLookupResp::STATIC_SIZE) / SnapshotLookupEdge::STATIC_SIZE;
int maxEdges = 1 + (DEFAULT_UDP_MTU - ShardResponseHeader::STATIC_SIZE - SnapshotLookupResp::STATIC_SIZE) / SnapshotLookupEdge::STATIC_SIZE;
std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator({}, _edgesCf));
StaticValue<EdgeKey> firstK;
firstK().setDirIdWithCurrent(req.dirId, false);