diff --git a/c/eggs_msgs.h b/c/eggs_msgs.h index d2338dfc..c5e63a16 100644 --- a/c/eggs_msgs.h +++ b/c/eggs_msgs.h @@ -48,6 +48,8 @@ #define EGGSFS_ERR_BAD_DIRECTORY_INFO 55 #define EGGSFS_ERR_CREATION_TIME_TOO_RECENT 56 #define EGGSFS_ERR_DEADLINE_NOT_PASSED 57 +#define EGGSFS_ERR_SAME_SOURCE_AND_DESTINATION 58 +#define EGGSFS_ERR_SAME_DIRECTORIES 59 #define EGGSFS_META_LOOKUP 0x1 #define EGGSFS_META_STAT_FILE 0x2 diff --git a/cpp/CDC.cpp b/cpp/CDC.cpp index 4d742d30..568be94f 100644 --- a/cpp/CDC.cpp +++ b/cpp/CDC.cpp @@ -313,7 +313,7 @@ private: void _processStep(const CDCStep& step) { LOG_DEBUG(_env, "processing step %s", step); if (step.txnFinished != 0) { - LOG_DEBUG(_env, "txn %s finished, err %s, resp %s", step.txnFinished, step.err, step.resp); + LOG_DEBUG(_env, "txn %s finished", step.txnFinished); // we need to send the response back to the client auto inFlight = _inFlightTxns.find(step.txnFinished); if (inFlight == _inFlightTxns.end()) { diff --git a/cpp/CDCDB.cpp b/cpp/CDCDB.cpp index 3e17d4be..f9fb7b93 100644 --- a/cpp/CDCDB.cpp +++ b/cpp/CDCDB.cpp @@ -92,7 +92,7 @@ std::ostream& operator<<(std::ostream& out, const CDCStep& x) { out << "CDCStep("; if (x.txnFinished != 0) { out << "finishedTxn=" << x.txnFinished; - if (x.err == NO_ERROR) { + if (x.err != NO_ERROR) { out << ", err=" << x.err; } else { out << ", resp=" << x.resp; @@ -546,6 +546,8 @@ struct CDCDBImpl { // and therefore we do it in another transaction type entirely. if (req.targetId.type() == InodeType::DIRECTORY) { _finishWithError(step, txnId, EggsError::TYPE_IS_NOT_DIRECTORY); + } else if (req.oldOwnerId == req.newOwnerId) { + _finishWithError(step, txnId, EggsError::SAME_DIRECTORIES); } else { reqStep = RenameFileStep::AFTER_LOCK_OLD_EDGE; auto& shardReq = _needsShard(step, txnId, req.oldOwnerId.shard()).setLockCurrentEdge(); @@ -780,6 +782,8 @@ struct CDCDBImpl { case RenameDirectoryStep::START: { if (req.targetId.type() != InodeType::DIRECTORY) { _finishWithError(step, txnId, EggsError::TYPE_IS_NOT_DIRECTORY); + } else if (req.oldOwnerId == req.newOwnerId) { + _finishWithError(step, txnId, EggsError::SAME_DIRECTORIES); } else if (!_loopCheck(dbTxn, req)) { // First, check if we'd create a loop _finishWithError(step, txnId, EggsError::LOOP_IN_DIRECTORY_RENAME); @@ -854,6 +858,12 @@ struct CDCDBImpl { ROCKS_DB_CHECKED(dbTxn.Put(_parentCf, k.toSlice(), v.toSlice())); } } + case RenameDirectoryStep::AFTER_ROLLBACK: { + ALWAYS_ASSERT(shardRespError == NO_ERROR); // TODO handle timeouts etc. + _finishWithError(step, txnId, state.exitError()); + } break; + default: + throw EGGS_EXCEPTION("bad step %s", (int)reqStep); } return reqStep; diff --git a/cpp/MsgsGen.cpp b/cpp/MsgsGen.cpp index dcb3de3c..0c5e0c5e 100644 --- a/cpp/MsgsGen.cpp +++ b/cpp/MsgsGen.cpp @@ -148,6 +148,12 @@ std::ostream& operator<<(std::ostream& out, EggsError err) { case EggsError::DEADLINE_NOT_PASSED: out << "DEADLINE_NOT_PASSED"; break; + case EggsError::SAME_SOURCE_AND_DESTINATION: + out << "SAME_SOURCE_AND_DESTINATION"; + break; + case EggsError::SAME_DIRECTORIES: + out << "SAME_DIRECTORIES"; + break; default: out << "EggsError(" << ((int)err) << ")"; break; @@ -420,30 +426,30 @@ std::ostream& operator<<(std::ostream& out, const SpanPolicy& x) { void DirectoryInfoBody::pack(BincodeBuf& buf) const { buf.packScalar(version); buf.packScalar(deleteAfterTime); - buf.packScalar(deleteAfterVersions); + buf.packScalar(deleteAfterVersions); buf.packList(spanPolicies); } void DirectoryInfoBody::unpack(BincodeBuf& buf) { version = buf.unpackScalar(); deleteAfterTime = buf.unpackScalar(); - deleteAfterVersions = buf.unpackScalar(); + deleteAfterVersions = buf.unpackScalar(); buf.unpackList(spanPolicies); } void DirectoryInfoBody::clear() { version = uint8_t(0); deleteAfterTime = uint64_t(0); - deleteAfterVersions = uint8_t(0); + deleteAfterVersions = uint16_t(0); spanPolicies.clear(); } bool DirectoryInfoBody::operator==(const DirectoryInfoBody& rhs) const { if ((uint8_t)this->version != (uint8_t)rhs.version) { return false; }; if ((uint64_t)this->deleteAfterTime != (uint64_t)rhs.deleteAfterTime) { return false; }; - if ((uint8_t)this->deleteAfterVersions != (uint8_t)rhs.deleteAfterVersions) { return false; }; + if ((uint16_t)this->deleteAfterVersions != (uint16_t)rhs.deleteAfterVersions) { return false; }; if (spanPolicies != rhs.spanPolicies) { return false; }; return true; } std::ostream& operator<<(std::ostream& out, const DirectoryInfoBody& x) { - out << "DirectoryInfoBody(" << "Version=" << (int)x.version << ", " << "DeleteAfterTime=" << x.deleteAfterTime << ", " << "DeleteAfterVersions=" << (int)x.deleteAfterVersions << ", " << "SpanPolicies=" << x.spanPolicies << ")"; + out << "DirectoryInfoBody(" << "Version=" << (int)x.version << ", " << "DeleteAfterTime=" << x.deleteAfterTime << ", " << "DeleteAfterVersions=" << x.deleteAfterVersions << ", " << "SpanPolicies=" << x.spanPolicies << ")"; return out; } diff --git a/cpp/MsgsGen.hpp b/cpp/MsgsGen.hpp index 71628517..ce0a80d2 100644 --- a/cpp/MsgsGen.hpp +++ b/cpp/MsgsGen.hpp @@ -52,6 +52,8 @@ enum class EggsError : uint16_t { BAD_DIRECTORY_INFO = 55, CREATION_TIME_TOO_RECENT = 56, DEADLINE_NOT_PASSED = 57, + SAME_SOURCE_AND_DESTINATION = 58, + SAME_DIRECTORIES = 59, }; std::ostream& operator<<(std::ostream& out, EggsError err); @@ -280,17 +282,17 @@ std::ostream& operator<<(std::ostream& out, const SpanPolicy& x); struct DirectoryInfoBody { uint8_t version; uint64_t deleteAfterTime; - uint8_t deleteAfterVersions; + uint16_t deleteAfterVersions; BincodeList spanPolicies; - static constexpr uint16_t STATIC_SIZE = 1 + 8 + 1 + BincodeList::STATIC_SIZE; // version + deleteAfterTime + deleteAfterVersions + spanPolicies + static constexpr uint16_t STATIC_SIZE = 1 + 8 + 2 + BincodeList::STATIC_SIZE; // version + deleteAfterTime + deleteAfterVersions + spanPolicies DirectoryInfoBody() { clear(); } uint16_t packedSize() const { uint16_t _size = 0; _size += 1; // version _size += 8; // deleteAfterTime - _size += 1; // deleteAfterVersions + _size += 2; // deleteAfterVersions _size += spanPolicies.packedSize(); // spanPolicies return _size; } diff --git a/cpp/ShardDB.cpp b/cpp/ShardDB.cpp index 40a692a4..53cea26a 100644 --- a/cpp/ShardDB.cpp +++ b/cpp/ShardDB.cpp @@ -320,7 +320,7 @@ struct ShardDBImpl { } }; - { + if (_shid == ROOT_DIR_INODE_ID.shard()) { auto k = InodeIdKey::Static(ROOT_DIR_INODE_ID); if (!keyExists(_directoriesCf, k.toSlice())) { LOG_INFO(_env, "creating root directory, since it does not exist"); @@ -637,13 +637,14 @@ struct ShardDBImpl { return NO_ERROR; } - EggsError _visitDirectories(const VisitDirectoriesReq& req, VisitDirectoriesResp& resp) { + template + EggsError _visitInodes(rocksdb::ColumnFamilyHandle* cf, const Req& req, Resp& resp) { resp.nextId = NULL_INODE_ID; - int budget = UDP_MTU - ShardResponseHeader::STATIC_SIZE - VisitDirectoriesResp::STATIC_SIZE; + int budget = UDP_MTU - ShardResponseHeader::STATIC_SIZE - Resp::STATIC_SIZE; int maxIds = (budget/8) + 1; // include next inode { - WrappedIterator it(_db->NewIterator({}, _directoriesCf)); + WrappedIterator it(_db->NewIterator({}, cf)); auto beginKey = InodeIdKey::Static(req.beginId); for ( it->Seek(beginKey.toSlice()); @@ -662,6 +663,10 @@ struct ShardDBImpl { return NO_ERROR; } + + EggsError _visitDirectories(const VisitDirectoriesReq& req, VisitDirectoriesResp& resp) { + return _visitInodes(_directoriesCf, req, resp); + } EggsError _fileSpans(const FileSpansReq& req, FileSpansResp& resp) { // snapshot probably not strictly needed -- it's for the possible lookup if we @@ -779,6 +784,10 @@ struct ShardDBImpl { return NO_ERROR; } + EggsError _visitFiles(const VisitFilesReq& req, VisitFilesResp& resp) { + return _visitInodes(_filesCf, req, resp); + } + EggsError read(const ShardReqContainer& req, ShardRespContainer& resp) { LOG_DEBUG(_env, "processing read-only request of kind %s", req.kind()); @@ -817,7 +826,8 @@ struct ShardDBImpl { err = _blockServiceFiles(req.getBlockServiceFiles(), resp.setBlockServiceFiles()); break; case ShardMessageKind::VISIT_FILES: - throw EGGS_EXCEPTION("UNIMPLEMENTED %s", req.kind()); + err = _visitFiles(req.getVisitFiles(), resp.setVisitFiles()); + break; default: throw EGGS_EXCEPTION("bad read-only shard message kind %s", req.kind()); } @@ -877,6 +887,9 @@ struct ShardDBImpl { if (req.dirId.type() != InodeType::DIRECTORY) { return EggsError::TYPE_IS_NOT_DIRECTORY; } + if (req.oldName == req.newName) { + return EggsError::SAME_SOURCE_AND_DESTINATION; + } if (!validName(req.newName.ref())) { return EggsError::BAD_NAME; } @@ -1729,6 +1742,7 @@ struct ShardDBImpl { dir().setMtime(time); dir().setHashMode(HashMode::XXH3_63); dir().setInfoInherited(entry.info.inherited); + LOG_DEBUG(_env, "inherited: %s, hasInfo: %s, body: %s", entry.info.inherited, dir().mustHaveInfo(), entry.info.body.ref()); dir().setInfo(entry.info.body.ref()); ROCKS_DB_CHECKED(batch.Put(_directoriesCf, dirKey.toSlice(), dir.toSlice())); } @@ -2016,7 +2030,9 @@ struct ShardDBImpl { std::string dirValue; ExternalValue dir; { - EggsError err = _initiateDirectoryModification(time, false, batch, entry.dirId, dirValue, dir); + // allowSnapshot=true since we might want to influence deletion policies for already deleted + // directories. + EggsError err = _initiateDirectoryModification(time, true, batch, entry.dirId, dirValue, dir); if (err != NO_ERROR) { return err; } @@ -2054,12 +2070,25 @@ struct ShardDBImpl { nameHash = computeHash(dir().hashMode(), entry.name.ref()); } + // We check that edge is still not owned -- otherwise we might orphan a file. { StaticValue k; k().setDirIdWithCurrent(entry.dirId, false); // snapshot (current=false), we're deleting a non owned snapshot edge k().setNameHash(nameHash); k().setName(entry.name.ref()); k().setCreationTime(entry.creationTime); + std::string edgeValue; + auto status = _db->Get({}, _edgesCf, k.toSlice(), &edgeValue); + if (status.IsNotFound()) { + return NO_ERROR; // make the client's life easier + } + ROCKS_DB_CHECKED(status); + ExternalValue edge(edgeValue); + if (edge().targetIdWithOwned().extra()) { + // TODO better error here? + return EggsError::EDGE_NOT_FOUND; // unexpectedly owned + } + // we can go ahead and safely delete ROCKS_DB_CHECKED(batch.Delete(_edgesCf, k.toSlice())); } @@ -2099,13 +2128,27 @@ struct ShardDBImpl { nameHash = computeHash(dir().hashMode(), entry.name.ref()); } - // remove edge + // We need to check that the edge is still there, and that it still owns the + // file. Maybe the file was re-owned by someone else in the meantime, in which case + // we can't proceed making the file transient. { StaticValue k; + // current=false since we can only delete k().setDirIdWithCurrent(entry.ownerId, false); k().setNameHash(nameHash); k().setName(entry.name.ref()); k().setCreationTime(entry.creationTime); + std::string edgeValue; + auto status = _db->Get({}, _edgesCf, k.toSlice(), &edgeValue); + if (status.IsNotFound()) { + return EggsError::EDGE_NOT_FOUND; // can't return NO_ERROR, since the transient file still exists + } + ROCKS_DB_CHECKED(status); + ExternalValue edge(edgeValue); + if (!edge().targetIdWithOwned().extra()) { // not owned + return EggsError::EDGE_NOT_FOUND; + } + // we can proceed ROCKS_DB_CHECKED(batch.Delete(_edgesCf, k.toSlice())); } @@ -2135,6 +2178,15 @@ struct ShardDBImpl { } } + // Exit early if file is empty. Crucial to do this with the size, + // otherwise we might spend a lot of time poring through the SSTs + // making sure there are no spans. + if (file().fileSize() == 0) { + return EggsError::FILE_EMPTY; + } + + LOG_DEBUG(_env, "deleting span from file %s of size %s", entry.fileId, file().fileSize()); + // Fetch the last span WrappedIterator spanIt(_db->NewIterator({}, _spansCf)); ExternalValue spanKey; @@ -2144,14 +2196,10 @@ struct ShardDBImpl { endKey().setFileId(entry.fileId); endKey().setOffset(file().fileSize()); spanIt->SeekForPrev(endKey.toSlice()); - if (!spanIt->Valid()) { - return EggsError::FILE_EMPTY; - } ROCKS_DB_CHECKED(spanIt->status()); + ALWAYS_ASSERT(spanIt->Valid()); // we know the file isn't empty, we must have a span spanKey = ExternalValue::FromSlice(spanIt->key()); - if (spanKey().fileId() != entry.fileId) { - return EggsError::FILE_EMPTY; - } + ALWAYS_ASSERT(spanKey().fileId() == entry.fileId); // again, we know the file isn't empty span = ExternalValue::FromSlice(spanIt->value()); } ALWAYS_ASSERT(span().storageClass() != EMPTY_STORAGE); @@ -2443,7 +2491,7 @@ struct ShardDBImpl { ROCKS_DB_CHECKED(status); ExternalValue span(spanValue); // "Is the span still there" - if (file().size() > entry.byteOffset+span().spanSize()) { + if (file().fileSize() > entry.byteOffset+span().spanSize()) { return NO_ERROR; // already certified (we're past it) } if (file().lastSpanState() == SpanState::CLEAN) { @@ -2781,7 +2829,9 @@ struct ShardDBImpl { return NO_ERROR; } - EggsError _initiateTransientFileModification(EggsTime time, bool allowPastDeadline, rocksdb::WriteBatch& batch, InodeId id, std::string& tfValue, ExternalValue& tf) { + EggsError _initiateTransientFileModification( + EggsTime time, bool allowPastDeadline, rocksdb::WriteBatch& batch, InodeId id, std::string& tfValue, ExternalValue& tf + ) { ExternalValue tmpTf; EggsError err = _getTransientFile({}, time, allowPastDeadline, id, tfValue, tmpTf); if (err != NO_ERROR) { @@ -2812,8 +2862,9 @@ struct ShardDBImpl { BincodeBytes defaultDirectoryInfo() { char buf[255]; DirectoryInfoBody info; + info.version = 0; // delete after 30 days - info.deleteAfterTime = 30ull /*days*/ * 24 /*hours*/ * 60 /*minutes*/ * 60 /*seconds*/ * 1'000'000'000 /*ns*/; + info.deleteAfterTime = (30ull /*days*/ * 24 /*hours*/ * 60 /*minutes*/ * 60 /*seconds*/ * 1'000'000'000 /*ns*/) | (1ull<<63); // do not delete after N versions info.deleteAfterVersions = 0; // up to 1MiB flash, up to 10MiB HDD (should be 100MiB really, it's nicer to test with smaller diff --git a/cpp/ShardDBData.hpp b/cpp/ShardDBData.hpp index 310f376b..3356e899 100644 --- a/cpp/ShardDBData.hpp +++ b/cpp/ShardDBData.hpp @@ -298,7 +298,7 @@ struct DirectoryBody { sizeof(InodeId) + // ownerId sizeof(EggsTime) + // mtime sizeof(HashMode) + // hashMode - sizeof(bool)+ // infoInherited + sizeof(bool) + // infoInherited sizeof(uint8_t); // infoLength static constexpr size_t MAX_SIZE = MIN_SIZE + 255; @@ -316,17 +316,17 @@ struct DirectoryBody { ALWAYS_ASSERT(sz == size()); } - bool hasInfo() const { - return ownerId() == NULL_INODE_ID || !infoInherited(); + bool mustHaveInfo() const { + return !infoInherited(); } BincodeBytesRef info() const { - ALWAYS_ASSERT(hasInfo() == (infoUnchecked().size() > 0)); + ALWAYS_ASSERT(!mustHaveInfo() || (infoUnchecked().size() > 0)); return infoUnchecked(); } void setInfo(const BincodeBytesRef& bytes) { - ALWAYS_ASSERT(hasInfo() == (bytes.size() > 0)); + ALWAYS_ASSERT(!mustHaveInfo() || (bytes.size() > 0)); setInfoUnchecked(bytes); } }; diff --git a/go/bincodegen/bincodegen.go b/go/bincodegen/bincodegen.go index 1b025d81..c6cefc08 100644 --- a/go/bincodegen/bincodegen.go +++ b/go/bincodegen/bincodegen.go @@ -233,6 +233,17 @@ func generateGoMsgKind(out io.Writer, typeName string, funName string, reqResps fmt.Fprintf(out, "\t}\n") fmt.Fprintf(out, "}\n\n") + fmt.Fprintf(out, "func (k %s) String() string {\n", typeName) + fmt.Fprintf(out, "\tswitch k {\n") + for _, reqResp := range reqResps { + fmt.Fprintf(out, "\tcase %v:\n", reqResp.kind) + fmt.Fprintf(out, "\t\treturn \"%s\"\n", reqRespEnum(reqResp)) + } + fmt.Fprintf(out, "\tdefault:\n") + fmt.Fprintf(out, "\t\treturn fmt.Sprintf(\"%s(%%d)\", k)\n", typeName) + fmt.Fprintf(out, "\t}\n") + fmt.Fprintf(out, "}\n\n") + fmt.Fprintf(out, "\n") fmt.Fprintf(out, "const (\n") @@ -1168,6 +1179,8 @@ func main() { "BAD_DIRECTORY_INFO", "CREATION_TIME_TOO_RECENT", "DEADLINE_NOT_PASSED", + "SAME_SOURCE_AND_DESTINATION", + "SAME_DIRECTORIES", } shardReqResps := []reqRespType{ diff --git a/go/eggs/cdcreq.go b/go/eggs/cdcreq.go index 65eb0a18..51bdb895 100644 --- a/go/eggs/cdcreq.go +++ b/go/eggs/cdcreq.go @@ -102,12 +102,15 @@ func CDCRequest( // are made regarding the contents of `respBody`. respBody bincode.Unpackable, ) error { + if msgs.GetCDCMessageKind(reqBody) != msgs.GetCDCMessageKind(respBody) { + panic(fmt.Errorf("mismatching req %T and resp %T", reqBody, respBody)) + } req := cdcRequest{ RequestId: requestId, Body: reqBody, } buffer := make([]byte, msgs.UDP_MTU) - logger.Debug("about to send request %T to CDC", reqBody) + // logger.Debug("about to send request %T to CDC", reqBody) reqBytes := buffer bincode.PackIntoBytes(&reqBytes, &req) written, err := writer.Write(reqBytes) diff --git a/go/eggs/client.go b/go/eggs/client.go index 907a2638..c6184af2 100644 --- a/go/eggs/client.go +++ b/go/eggs/client.go @@ -11,7 +11,54 @@ import ( type Client interface { ShardRequest(log LogLevels, shid msgs.ShardId, req bincode.Packable, resp bincode.Unpackable) error CDCRequest(log LogLevels, req bincode.Packable, resp bincode.Unpackable) error - Close() error +} + +// Holds sockets to all 256 shards +type AllShardsClient struct { + timeout time.Duration + shardSocks []*net.UDPConn + cdcSock *net.UDPConn +} + +func NewAllShardsClient() (*AllShardsClient, error) { + var err error + c := AllShardsClient{ + timeout: 10 * time.Second, + } + c.shardSocks = make([]*net.UDPConn, 256) + for i := 0; i < 256; i++ { + c.shardSocks[msgs.ShardId(i)], err = ShardSocket(msgs.ShardId(i)) + if err != nil { + return nil, err + } + } + c.cdcSock, err = CDCSocket() + if err != nil { + return nil, err + } + return &c, nil +} + +// TODO probably convert these errors to stderr, we can't do much with them usually +// but they'd be worth knowing about +func (c *AllShardsClient) Close() error { + for _, sock := range c.shardSocks { + if err := sock.Close(); err != nil { + return err + } + } + if err := c.cdcSock.Close(); err != nil { + return err + } + return nil +} + +func (c *AllShardsClient) ShardRequest(log LogLevels, shid msgs.ShardId, req bincode.Packable, resp bincode.Unpackable) error { + return ShardRequestSocket(log, nil, c.shardSocks[shid], c.timeout, req, resp) +} + +func (c *AllShardsClient) CDCRequest(log LogLevels, req bincode.Packable, resp bincode.Unpackable) error { + return CDCRequestSocket(log, c.cdcSock, c.timeout, req, resp) } // For when you almost always do requests to a single shard (e.g. in GC). @@ -70,6 +117,7 @@ func (c *ShardSpecificClient) CDCRequest(log LogLevels, req bincode.Packable, re return CDCRequestSocket(log, c.cdcSock, c.timeout, req, resp) } +// nil if the directory has no directory info (i.e. if it is inherited) func GetDirectoryInfo(log LogLevels, c Client, id msgs.InodeId) (*msgs.DirectoryInfoBody, error) { req := msgs.StatDirectoryReq{ Id: id, @@ -83,7 +131,7 @@ func GetDirectoryInfo(log LogLevels, c Client, id msgs.InodeId) (*msgs.Directory return nil, err } if len(resp.Info) == 0 { - panic(fmt.Errorf("empty info")) + return nil, nil } return &info, nil } diff --git a/go/eggs/gc.go b/go/eggs/gc.go index 7d23fdcd..8afecd98 100644 --- a/go/eggs/gc.go +++ b/go/eggs/gc.go @@ -2,7 +2,6 @@ package eggs import ( "fmt" - "time" "xtx/eggsfs/bincode" "xtx/eggsfs/msgs" ) @@ -86,6 +85,31 @@ func DestructFile( return nil } +func destructFilesInternal( + log LogLevels, client Client, shid msgs.ShardId, stats *DestructionStats, blockServicesKeys map[msgs.BlockServiceId][16]byte, +) error { + req := msgs.VisitTransientFilesReq{} + resp := msgs.VisitTransientFilesResp{} + for { + log.Debug("visiting files with %+v", req) + err := client.ShardRequest(log, shid, &req, &resp) + if err != nil { + return fmt.Errorf("could not visit transient files: %w", err) + } + for ix := range resp.Files { + file := &resp.Files[ix] + if err := DestructFile(log, client, blockServicesKeys, stats, file.Id, file.DeadlineTime, file.Cookie); err != nil { + return fmt.Errorf("%+v: error while destructing file: %w", file, err) + } + } + req.BeginId = resp.NextId + if resp.NextId == 0 { + break + } + } + return nil +} + // Collects dead transient files, and expunges them. Stops when // all files have been traversed. Useful for testing a single iteration. // @@ -101,33 +125,32 @@ func DestructFiles( } defer client.Close() stats := DestructionStats{} - req := msgs.VisitTransientFilesReq{} - resp := msgs.VisitTransientFilesResp{} - for { - /* - if stats.VisitedFiles%100 == 0 { - log.Info("%v visited files, %v destructed files, %v destructed spans, %v destructed blocks", stats.VisitedFiles, stats.DestructedFiles, stats.DestructedSpans, stats.DestructedBlocks) - } - */ - err := client.ShardRequest(log, shid, &req, &resp) - if err != nil { - return fmt.Errorf("could not visit transient files: %w", err) - } - for ix := range resp.Files { - file := &resp.Files[ix] - if err := DestructFile(log, client, blockServicesKeys, &stats, file.Id, file.DeadlineTime, file.Cookie); err != nil { - return fmt.Errorf("%+v: error while destructing file: %w", file, err) - } - } - req.BeginId = resp.NextId - if resp.NextId == 0 { - break - } + if err := destructFilesInternal(log, client, shid, &stats, blockServicesKeys); err != nil { + return err } log.Info("stats after one destruct files iteration: %+v", stats) return nil } +func DestructFilesInAllShards( + log LogLevels, blockServicesKeys map[msgs.BlockServiceId][16]byte, +) error { + client, err := NewAllShardsClient() + if err != nil { + return err + } + defer client.Close() + stats := DestructionStats{} + for i := 0; i < 256; i++ { + shid := msgs.ShardId(i) + if err := destructFilesInternal(log, client, shid, &stats, blockServicesKeys); err != nil { + return err + } + } + log.Info("stats after one destruct files iteration in all shards: %+v", stats) + return nil +} + type CollectStats struct { VisitedDirectories uint64 VisitedEdges uint64 @@ -141,8 +164,8 @@ func applyPolicy( dirId msgs.InodeId, dirInfo *msgs.DirectoryInfoBody, edges []msgs.Edge, ) (bool, error) { policy := SnapshotPolicy{ - DeleteAfterTime: time.Duration(dirInfo.DeleteAfterTime), - DeleteAfterVersions: int(dirInfo.DeleteAfterVersions), + DeleteAfterTime: dirInfo.DeleteAfterTime, + DeleteAfterVersions: dirInfo.DeleteAfterVersions, } log.Debug("%v: about to apply policy %+v for name %s", dirId, policy, edges[0].Name) stats.VisitedEdges = stats.VisitedEdges + uint64(len(edges)) @@ -314,13 +337,7 @@ func CollectDirectory(log LogLevels, client Client, dirInfoCache *DirInfoCache, return nil } -func CollectDirectories(log LogLevels, shid msgs.ShardId) error { - client, err := NewShardSpecificClient(shid) - if err != nil { - return err - } - defer client.Close() - stats := CollectStats{} +func collectDirectoriesInternal(log LogLevels, client Client, stats *CollectStats, shid msgs.ShardId) error { dirInfoCache := NewDirInfoCache() req := msgs.VisitDirectoriesReq{} resp := msgs.VisitDirectoriesResp{} @@ -333,7 +350,10 @@ func CollectDirectories(log LogLevels, shid msgs.ShardId) error { if id.Type() != msgs.DIRECTORY { panic(fmt.Errorf("bad directory inode %v", id)) } - if err := CollectDirectory(log, client, dirInfoCache, &stats, id); err != nil { + if id.Shard() != shid { + panic("bad shard") + } + if err := CollectDirectory(log, client, dirInfoCache, stats, id); err != nil { return fmt.Errorf("error while collecting inode %v: %w", id, err) } } @@ -342,6 +362,35 @@ func CollectDirectories(log LogLevels, shid msgs.ShardId) error { break } } + return nil +} + +func CollectDirectories(log LogLevels, shid msgs.ShardId) error { + client, err := NewShardSpecificClient(shid) + if err != nil { + return err + } + defer client.Close() + stats := CollectStats{} + if err := collectDirectoriesInternal(log, client, &stats, shid); err != nil { + return err + } log.Info("stats after one collect directories iteration: %+v", stats) return nil } + +func CollectDirectoriesInAllShards(log LogLevels) error { + client, err := NewAllShardsClient() + if err != nil { + return err + } + defer client.Close() + stats := CollectStats{} + for i := 0; i < 256; i++ { + if err := collectDirectoriesInternal(log, client, &stats, msgs.ShardId(i)); err != nil { + return err + } + } + log.Info("stats after one all shards collect directories iteration: %+v", stats) + return nil +} diff --git a/go/eggs/managedprocess.go b/go/eggs/managedprocess.go index 16dd28bf..8b83b8f3 100644 --- a/go/eggs/managedprocess.go +++ b/go/eggs/managedprocess.go @@ -467,8 +467,8 @@ func WaitForShard(shid msgs.ShardId, timeout time.Duration) { nil, sock, time.Second, - &msgs.StatDirectoryReq{Id: msgs.ROOT_DIR_INODE_ID}, - &msgs.StatDirectoryResp{}, + &msgs.VisitDirectoriesReq{}, + &msgs.VisitDirectoriesResp{}, ) sock.Close() if err != nil { diff --git a/go/eggs/policy.go b/go/eggs/policy.go index b9bf403a..d595fa05 100644 --- a/go/eggs/policy.go +++ b/go/eggs/policy.go @@ -1,28 +1,28 @@ package eggs import ( - "time" "xtx/eggsfs/msgs" ) // If multiple policies are present, the file will be deleted if -// any of the policies are not respected. If neither policy is present, -// snapshots will never be deleted. +// any of the policies are not respected. +// +// If neither policies are active, then snapshots will be kept forever. +// +// Also note that you can use either policy to delete all snapshots, by +// setting either to zero. type SnapshotPolicy struct { - // Keep all files/directories versions with a certain name within this time window. - // If zero, this kind of policy is inactive. - DeleteAfterTime time.Duration - // Keep last N file/directory versions with a certain name. If 0, this - // kind of policy is inactive. - DeleteAfterVersions int + DeleteAfterTime msgs.DeleteAfterTime + DeleteAfterVersions msgs.DeleteAfterVersions } -// The edges are the entirety of the edges for a certain file name in a certain dir. -// Oldest edge first. +// Returns how many edges to remove according to the policy (as a prefix of the input). +// +// `edges` should be all the snapshot edges for a certain directory, oldest edge first. // // It is assumed that every delete in the input will be be preceeded by a non-delete. // -// If it returns N, edges[N:] will be well formed too. +// If it returns N, edges[N:] will be well formed too in the sense above. func (policy *SnapshotPolicy) edgesToRemove(now msgs.EggsTime, edges []msgs.Edge) int { if len(edges) == 0 { return 0 @@ -30,40 +30,39 @@ func (policy *SnapshotPolicy) edgesToRemove(now msgs.EggsTime, edges []msgs.Edge // Index dividing edges, so that all all edges[i] i < firstGoodEdgeVersions should be // removed, while all edges[i] i >= firstGoodEdgeVersions should be kept. firstGoodEdgeVersions := 0 - // Note that DeleteAfterVersions is a bit tricky: we don't know here if there even - // is a current edge. So depending on whether there is or not, results might differ. + // Note that DeleteAfterVersions only affects the snapshot edges: we don't look at the + // current edge at all. // - // We delete conservative (assuming that there _is_ a current edge). This prevents us from - // using policy.DeleteAfterVersions=1 to immediately purge deleted versions, we might - // want to revise this. - if policy.DeleteAfterVersions > 0 { + // This means, for example, that if we have DeleteAfterVersions=5, after applying the policy + // there might be 5 or 6 edges with a certain name, depending on whether the a current + // edge for it exists. + if policy.DeleteAfterVersions.Active() { versionNumber := 0 for firstGoodEdgeVersions = len(edges) - 1; firstGoodEdgeVersions >= 0; firstGoodEdgeVersions-- { // ignore deletes, we just want to keep the last N versions. if edges[firstGoodEdgeVersions].TargetId.Id() == msgs.NULL_INODE_ID { continue } - versionNumber++ - // the latest version number is the latest to keep - if versionNumber >= policy.DeleteAfterVersions { + if versionNumber >= int(policy.DeleteAfterVersions.Versions()) { + firstGoodEdgeVersions++ break } + versionNumber++ } } firstGoodEdgeTime := 0 - if policy.DeleteAfterTime > time.Duration(0) { + if policy.DeleteAfterTime.Active() { for firstGoodEdgeTime = len(edges) - 1; firstGoodEdgeTime >= 0; firstGoodEdgeTime-- { - // if this file was created before the cutoff, then it is the last one to - // matter. creationTime := edges[firstGoodEdgeTime].CreationTime.Time() - if now.Time().Sub(creationTime) > policy.DeleteAfterTime { + if now.Time().Sub(creationTime) > policy.DeleteAfterTime.Time() { + firstGoodEdgeVersions++ break } } } firstGoodEdge := Max(0, Max(firstGoodEdgeVersions, firstGoodEdgeTime)) // if the last edge is a delete, remove that too (we can't keep a delete hanging) - if edges[firstGoodEdge].TargetId.Id() == msgs.NULL_INODE_ID { + if firstGoodEdge < len(edges) && edges[firstGoodEdge].TargetId.Id() == msgs.NULL_INODE_ID { firstGoodEdge++ } return firstGoodEdge diff --git a/go/eggs/policy_test.go b/go/eggs/policy_test.go index 0d43ecc0..fb35e490 100644 --- a/go/eggs/policy_test.go +++ b/go/eggs/policy_test.go @@ -16,6 +16,25 @@ func date(day int) msgs.EggsTime { return msgs.MakeEggsTime(time.Date(2021, time.January, day, 0, 0, 0, 0, time.UTC)) } +func TestDeleteAll(t *testing.T) { + edges := []msgs.Edge{ + { + TargetId: inodeId(1), + NameHash: 0, // unneeded + Name: "f", + CreationTime: date(1), + }, + { + TargetId: msgs.InodeIdExtra(msgs.NULL_INODE_ID), + NameHash: 0, // uneeded + Name: "f", + CreationTime: date(2), + }, + } + assert.Equal(t, 2, (&SnapshotPolicy{DeleteAfterVersions: msgs.ActiveDeleteAfterVersions(0)}).edgesToRemove(date(3), edges)) + assert.Equal(t, 2, (&SnapshotPolicy{DeleteAfterTime: msgs.ActiveDeleteAfterTime(0)}).edgesToRemove(date(3), edges)) +} + func TestKeepWithin(t *testing.T) { edges := []msgs.Edge{ { @@ -43,17 +62,17 @@ func TestKeepWithin(t *testing.T) { CreationTime: date(7), }, } - assert.Equal(t, 0, (&SnapshotPolicy{DeleteAfterVersions: 10}).edgesToRemove(msgs.Now(), edges)) - assert.Equal(t, 3, (&SnapshotPolicy{DeleteAfterVersions: 1}).edgesToRemove(msgs.Now(), edges)) - assert.Equal(t, 1, (&SnapshotPolicy{DeleteAfterVersions: 2}).edgesToRemove(msgs.Now(), edges)) // delete does not count + assert.Equal(t, 0, (&SnapshotPolicy{DeleteAfterVersions: msgs.ActiveDeleteAfterVersions(10)}).edgesToRemove(msgs.Now(), edges)) + assert.Equal(t, 3, (&SnapshotPolicy{DeleteAfterVersions: msgs.ActiveDeleteAfterVersions(1)}).edgesToRemove(msgs.Now(), edges)) + assert.Equal(t, 1, (&SnapshotPolicy{DeleteAfterVersions: msgs.ActiveDeleteAfterVersions(2)}).edgesToRemove(msgs.Now(), edges)) // delete does not count // this falls on day 6, so between the delete and the last create. everything // apart from the last create should be deleted. - assert.Equal(t, 3, (&SnapshotPolicy{DeleteAfterTime: 48 * time.Hour}).edgesToRemove(date(8), edges)) + assert.Equal(t, 3, (&SnapshotPolicy{DeleteAfterTime: msgs.ActiveDeleteAfterTime(48 * time.Hour)}).edgesToRemove(date(8), edges)) // this falls on day 4, so between the second create and the deleted. only // the first create should be removed - assert.Equal(t, 1, (&SnapshotPolicy{DeleteAfterTime: 48 * time.Hour * 2}).edgesToRemove(date(8), edges)) + assert.Equal(t, 1, (&SnapshotPolicy{DeleteAfterTime: msgs.ActiveDeleteAfterTime(48 * time.Hour * 2)}).edgesToRemove(date(8), edges)) // this is well before anything - assert.Equal(t, 0, (&SnapshotPolicy{DeleteAfterTime: 48 * time.Hour * 10}).edgesToRemove(date(8), edges)) + assert.Equal(t, 0, (&SnapshotPolicy{DeleteAfterTime: msgs.ActiveDeleteAfterTime(48 * time.Hour * 10)}).edgesToRemove(date(8), edges)) // doesn't crash with empty edges - assert.Equal(t, 0, (&SnapshotPolicy{DeleteAfterTime: 48 * time.Hour * 10}).edgesToRemove(date(8), make([]msgs.Edge, 0))) + assert.Equal(t, 0, (&SnapshotPolicy{DeleteAfterTime: msgs.ActiveDeleteAfterTime(48 * time.Hour * 10)}).edgesToRemove(date(8), make([]msgs.Edge, 0))) } diff --git a/go/eggs/shardreq.go b/go/eggs/shardreq.go index c8ad6cc5..84fbfb4f 100644 --- a/go/eggs/shardreq.go +++ b/go/eggs/shardreq.go @@ -117,12 +117,16 @@ func ShardRequest( // are made regarding the contents of `respBody`. respBody bincode.Unpackable, ) error { + if msgs.GetShardMessageKind(reqBody) != msgs.GetShardMessageKind(respBody) { + panic(fmt.Errorf("mismatching req %T and resp %T", reqBody, respBody)) + } req := shardRequest{ requestId: requestId, body: reqBody, } buffer := make([]byte, msgs.UDP_MTU) reqBytes := buffer + t0 := time.Now() logger.Debug("about to send request %T to shard", reqBody) packShardRequest(&reqBytes, &req, cdcKey) written, err := writer.Write(reqBytes) @@ -157,10 +161,10 @@ func ShardRequest( } // we managed to decode, we just need to check that it's not an error if resp.Error != nil { - logger.Debug("got error %v from shard", resp.Error) + logger.Debug("got error %v from shard (took %v)", resp.Error, time.Since(t0)) return resp.Error } - logger.Debug("got response %T from shard", respBody) + logger.Debug("got response %T from shard (took %v)", respBody, time.Since(t0)) return nil } } diff --git a/go/integrationtest/cleanup.go b/go/integrationtest/cleanup.go new file mode 100644 index 00000000..d69a2dd5 --- /dev/null +++ b/go/integrationtest/cleanup.go @@ -0,0 +1,121 @@ +package main + +import ( + "fmt" + "xtx/eggsfs/eggs" + "xtx/eggsfs/msgs" +) + +func deleteDir(log eggs.LogLevels, client eggs.Client, ownerId msgs.InodeId, name string, dirId msgs.InodeId) { + readDirReq := msgs.ReadDirReq{ + DirId: dirId, + } + readDirResp := msgs.ReadDirResp{} + for { + if err := client.ShardRequest(log, dirId.Shard(), &readDirReq, &readDirResp); err != nil { + panic(err) + } + for _, res := range readDirResp.Results { + if res.TargetId.Type() == msgs.DIRECTORY { + deleteDir(log, client, dirId, res.Name, res.TargetId) + } else { + if err := client.ShardRequest( + log, res.TargetId.Shard(), &msgs.SoftUnlinkFileReq{OwnerId: dirId, FileId: res.TargetId, Name: res.Name}, &msgs.SoftUnlinkFileResp{}, + ); err != nil { + panic(err) + } + } + } + if readDirResp.NextHash == 0 { + break + } + } + if ownerId != msgs.NULL_INODE_ID { + if err := client.CDCRequest( + log, &msgs.SoftUnlinkDirectoryReq{OwnerId: ownerId, TargetId: dirId, Name: name}, &msgs.SoftUnlinkDirectoryResp{}, + ); err != nil { + panic(err) + } + } +} + +func cleanupAfterTest( + blockServicesKeys map[msgs.BlockServiceId][16]byte, +) { + log := &eggs.LogToStdout{} + client, err := eggs.NewAllShardsClient() + if err != nil { + panic(err) + } + defer client.Close() + // Delete all current things + deleteDir(log, client, msgs.NULL_INODE_ID, "", msgs.ROOT_DIR_INODE_ID) + // Make all historical stuff die immediately for all directories + for i := 0; i < 256; i++ { + shid := msgs.ShardId(i) + visitDirsReq := msgs.VisitDirectoriesReq{} + visitDirsResp := msgs.VisitDirectoriesResp{} + for { + if err := client.ShardRequest(log, shid, &visitDirsReq, &visitDirsResp); err != nil { + panic(err) + } + for _, dirId := range visitDirsResp.Ids { + dirInfo, err := eggs.GetDirectoryInfo(log, client, dirId) + if err != nil { + panic(err) + } + if dirInfo == nil { // inherited + continue + } + dirInfo.DeleteAfterVersions = msgs.ActiveDeleteAfterVersions(0) + if err := eggs.SetDirectoryInfo(log, client, dirId, false, dirInfo); err != nil { + panic(fmt.Errorf("could not set directory info for dir %v, shard %v: %v", dirId, dirId.Shard(), err)) + } + } + if visitDirsResp.NextId == 0 { + break + } + } + } + // Collect everything + if err := eggs.CollectDirectoriesInAllShards(log); err != nil { + panic(err) + } + if err := eggs.DestructFilesInAllShards(log, blockServicesKeys); err != nil { + panic(err) + } + // Make sure nothing is left + for i := 0; i < 256; i++ { + shid := msgs.ShardId(i) + // No dirs + visitDirsResp := msgs.VisitDirectoriesResp{} + if err := client.ShardRequest(log, shid, &msgs.VisitDirectoriesReq{}, &visitDirsResp); err != nil { + panic(err) + } + if len(visitDirsResp.Ids) > 0 && !(len(visitDirsResp.Ids) == 1 && visitDirsResp.Ids[0] == msgs.ROOT_DIR_INODE_ID) { + panic(err) + } + // No files + visitFilesResp := msgs.VisitFilesResp{} + if err := client.ShardRequest(log, shid, &msgs.VisitFilesReq{}, &visitFilesResp); err != nil { + panic(err) + } + if len(visitFilesResp.Ids) > 0 { + panic(fmt.Errorf("%v: unexpected files (%v) after cleanup", shid, len(visitFilesResp.Ids))) + } + // No transient files + visitTransientFilesResp := msgs.VisitTransientFilesResp{} + if err := client.ShardRequest(log, shid, &msgs.VisitTransientFilesReq{}, &visitTransientFilesResp); err != nil { + panic(fmt.Errorf("%v: unexpected transient files (%v) after cleanup", shid, len(visitTransientFilesResp.Files))) + } + } + // Nothing in root dir + fullReadDirResp := msgs.FullReadDirResp{} + if err := client.ShardRequest(log, msgs.ROOT_DIR_INODE_ID.Shard(), &msgs.FullReadDirReq{DirId: msgs.ROOT_DIR_INODE_ID}, &fullReadDirResp); err != nil { + panic(err) + } + if len(fullReadDirResp.Results) != 0 { + panic(fmt.Errorf("unexpected stuff in root directory")) + } + +} diff --git a/go/integrationtest/filehistory.go b/go/integrationtest/filehistory.go new file mode 100644 index 00000000..9c060237 --- /dev/null +++ b/go/integrationtest/filehistory.go @@ -0,0 +1,339 @@ +// A simple tests creating files and also looking at the history +package main + +import ( + "fmt" + "math/rand" + "strings" + "sync" + "xtx/eggsfs/eggs" + "xtx/eggsfs/msgs" +) + +// actions + +type createFile struct { + name string + size uint64 +} + +type deleteFile struct { + name string +} + +type renameFile struct { + oldName string + newName string +} + +// trace + +type createdFile struct { + name string + id msgs.InodeId +} + +type checkpoint struct { + time msgs.EggsTime +} + +type files struct { + names []string + ids []msgs.InodeId + byName map[string]int +} + +func (files *files) addFile(name string, id msgs.InodeId) { + if _, wasPresent := files.byName[name]; wasPresent { + panic(fmt.Errorf("unexpected overwrite of %s", name)) + } + files.names = append(files.names, name) + files.ids = append(files.ids, id) + files.byName[name] = len(files.ids) - 1 +} + +func (files *files) id(name string) msgs.InodeId { + ix, present := files.byName[name] + if !present { + panic(fmt.Errorf("name not found %v", name)) + } + return files.ids[ix] +} + +func (files *files) deleteFile(name string) { + if _, wasPresent := files.byName[name]; !wasPresent { + panic(fmt.Errorf("name not found %v", name)) + } + ix := files.byName[name] + lastIx := len(files.ids) - 1 + if ix != lastIx { + files.ids[ix] = files.ids[lastIx] + files.names[ix] = files.names[lastIx] + files.byName[files.names[ix]] = ix + } + files.ids = files.ids[:lastIx] + files.names = files.names[:lastIx] + delete(files.byName, name) +} + +func genCreateFile(filePrefix string, rand *rand.Rand, files *files) createFile { + var name string + for { + name = fmt.Sprintf("%s%x", filePrefix, rand.Uint32()) + _, wasPresent := files.byName[name] + if !wasPresent { + delete(files.byName, name) + size := rand.Uint64() % (uint64(100) << 20) // up to 20MiB + return createFile{ + name: name, + size: size, + } + } + } +} + +func genDeleteFile(filePrefix string, rand *rand.Rand, files *files) deleteFile { + return deleteFile{name: files.names[int(rand.Uint32())%len(files.names)]} +} + +func genRenameFile(filePrefix string, rand *rand.Rand, files *files) renameFile { + oldName := genDeleteFile(filePrefix, rand, files).name + newName := genCreateFile(filePrefix, rand, files).name + return renameFile{ + oldName: oldName, + newName: newName, + } +} + +func genRenameOverrideFile(filePrefix string, rand *rand.Rand, files *files) renameFile { + oldName := genDeleteFile(filePrefix, rand, files).name + newName := genDeleteFile(filePrefix, rand, files).name + if oldName == newName { + return genRenameFile(filePrefix, rand, files) + } + return renameFile{ + oldName: oldName, + newName: newName, + } +} + +func checkCheckpoint(prefix string, files *files, allEdges []edge) { + edges := []edge{} + for _, edge := range allEdges { + if !strings.HasPrefix(edge.name, prefix) { + continue + } + edges = append(edges, edge) + } + if len(edges) != len(files.names) { + panic(fmt.Errorf("expected %d edges, got %d", len(files.names), len(edges))) + } + for _, edge := range edges { + id := files.id(edge.name) + if id != edge.targetId { + panic(fmt.Errorf("expected targetId %v for edge %v, but got %v", id, edge.name, id)) + } + } +} + +func runCheckpoint(harness *harness, prefix string, files *files) checkpoint { + edges := harness.readDir(msgs.ROOT_DIR_INODE_ID) + checkCheckpoint(prefix, files, edges) + resp := msgs.StatDirectoryResp{} + harness.shardReq(msgs.ROOT_DIR_INODE_ID.Shard(), &msgs.StatDirectoryReq{Id: msgs.ROOT_DIR_INODE_ID}, &resp) + return checkpoint{ + time: resp.Mtime, + } +} + +func runStep(harness *harness, files *files, stepAny any) any { + switch step := stepAny.(type) { + case createFile: + id := harness.createFile(msgs.ROOT_DIR_INODE_ID, step.name, step.size) + files.addFile(step.name, id) + return createdFile{ + name: step.name, + id: id, + } + case deleteFile: + fileId := files.id(step.name) + harness.shardReq(msgs.ROOT_DIR_INODE_ID.Shard(), &msgs.SoftUnlinkFileReq{OwnerId: msgs.ROOT_DIR_INODE_ID, FileId: fileId, Name: step.name}, &msgs.SoftUnlinkFileResp{}) + files.deleteFile(step.name) + return step + case renameFile: + targetId := files.id(step.oldName) + harness.shardReq( + msgs.ROOT_DIR_INODE_ID.Shard(), + &msgs.SameDirectoryRenameReq{DirId: msgs.ROOT_DIR_INODE_ID, TargetId: targetId, OldName: step.oldName, NewName: step.newName}, + &msgs.SameDirectoryRenameResp{}, + ) + files.deleteFile(step.oldName) + if _, wasPresent := files.byName[step.newName]; wasPresent { + // overwrite + files.deleteFile(step.newName) + } + files.addFile(step.newName, targetId) + return step + default: + panic(fmt.Errorf("bad step %T", stepAny)) + } +} + +func edgesAsOf(allEdges []fullEdge, t msgs.EggsTime) []edge { + namesToEdge := make(map[string]fullEdge) + + for _, fullEdge := range allEdges { + if fullEdge.creationTime > t { + continue + } + namesToEdge[fullEdge.name] = fullEdge + } + + edges := []edge{} + for _, fullEdge := range namesToEdge { + if fullEdge.targetId == msgs.NULL_INODE_ID { + continue + } + edges = append(edges, edge{ + name: fullEdge.name, + targetId: fullEdge.targetId, + }) + } + + return edges +} + +func replayCheckpoint(prefix string, files *files, fullEdges []fullEdge, t msgs.EggsTime) { + edges := edgesAsOf(fullEdges, t) + checkCheckpoint(prefix, files, edges) +} + +func replayStep(prefix string, files *files, fullEdges []fullEdge, stepAny any) { + switch step := stepAny.(type) { + case createdFile: + files.addFile(step.name, step.id) + case deleteFile: + files.deleteFile(step.name) + case renameFile: + targetId := files.id(step.oldName) + files.deleteFile(step.oldName) + if _, wasPresent := files.byName[step.newName]; wasPresent { + // overwrite + files.deleteFile(step.newName) + } + files.addFile(step.newName, targetId) + case checkpoint: + replayCheckpoint(prefix, files, fullEdges, step.time) + default: + panic(fmt.Errorf("bad step %T", stepAny)) + } +} + +func fileHistoryStepSingle(opts *fileHistoryTestOpts, harness *harness, seed int64, filePrefix string) { + // loop for n steps. at every step: + // * if we have never reached the target files, then just create a file. + // * if we have, create/delete/rename/rename with override at random. + // * every checkpointEvery steps, use readDir to make sure that we have the files that we expect. + // + // after we've finished, re-run through everything and re-check at ever checkpointSteps using + // fullReadDir. + trace := []any{} + + reachedTargetFiles := false + fls := files{ + names: []string{}, + ids: []msgs.InodeId{}, + byName: make(map[string]int), + } + source := rand.NewSource(seed) + rand := rand.New(source) + for stepIx := 0; stepIx < opts.steps; stepIx++ { + if stepIx%opts.checkpointEvery == 0 { + checkpoint := runCheckpoint(harness, filePrefix, &fls) + trace = append(trace, checkpoint) + } + var step any + if len(fls.names) < opts.lowFiles { + reachedTargetFiles = false + } + if !reachedTargetFiles { + step = genCreateFile(filePrefix, rand, &fls) + } else { + which := rand.Uint32() % 4 + switch which { + case 0: + step = genCreateFile(filePrefix, rand, &fls) + case 1: + step = genDeleteFile(filePrefix, rand, &fls) + case 2: + step = genRenameFile(filePrefix, rand, &fls) + case 3: + step = genRenameOverrideFile(filePrefix, rand, &fls) + default: + panic(fmt.Errorf("bad which %d", which)) + } + } + reachedTargetFiles = reachedTargetFiles || len(fls.names) >= opts.targetFiles + trace = append(trace, runStep(harness, &fls, step)) + } + fls = files{ + names: []string{}, + ids: []msgs.InodeId{}, + byName: make(map[string]int), + } + fullEdges := harness.fullReadDir(msgs.ROOT_DIR_INODE_ID) + for _, step := range trace { + replayStep(filePrefix, &fls, fullEdges, step) + } +} + +type fileHistoryTestOpts struct { + steps int // how many actions to perform + checkpointEvery int // how often to run the historical check + targetFiles int // how many files we want + lowFiles int + threads int // how many tests to run at once +} + +func fileHistoryTest( + log eggs.LogLevels, + opts *fileHistoryTestOpts, + stats *harnessStats, + blockServicesKeys map[msgs.BlockServiceId][16]byte, +) { + terminateChan := make(chan any, 1) + + go func() { + defer func() { handleRecover(terminateChan, recover()) }() + numTests := opts.threads + if numTests > 15 { + panic(fmt.Errorf("numTests %d too big for one-digit prefix", numTests)) + } + var wait sync.WaitGroup + wait.Add(numTests) + for i := 0; i < numTests; i++ { + prefix := fmt.Sprintf("%x", i) + seed := int64(i) + go func() { + defer func() { handleRecover(terminateChan, recover()) }() + client, err := eggs.NewShardSpecificClient(msgs.ShardId(0)) + if err != nil { + panic(err) + } + defer client.Close() + harness := newHarness(log, client, stats, blockServicesKeys) + fileHistoryStepSingle(opts, harness, seed, prefix) + wait.Done() + }() + } + wait.Wait() + + terminateChan <- nil + }() + + err := <-terminateChan + + if err != nil { + panic(err) + } +} diff --git a/go/integrationtest/fstest.go b/go/integrationtest/fstest.go new file mode 100644 index 00000000..9499a46c --- /dev/null +++ b/go/integrationtest/fstest.go @@ -0,0 +1,213 @@ +// Very simple test creating some directory tree and reading it back +package main + +import ( + "fmt" + "math" + "math/rand" + "strconv" + "xtx/eggsfs/eggs" + "xtx/eggsfs/msgs" +) + +type fsTestOpts struct { + numDirs int // how many dirs (in total) to create + numFiles int // how many files (in total) to create + maxDepth int // max directory tree depth +} + +type fsTestDir struct { + id msgs.InodeId + children fsTestChildren +} + +// We always use integers as names +type fsTestChildren struct { + files map[int]msgs.InodeId + directories map[int]*fsTestDir +} + +func newFsTestDir(id msgs.InodeId) *fsTestDir { + return &fsTestDir{ + id: id, + children: fsTestChildren{ + files: make(map[int]msgs.InodeId), + directories: make(map[int]*fsTestDir), + }, + } +} + +type fsTestState struct { + totalDirs int + totalFiles int + rootDir *fsTestDir +} + +func (s *fsTestDir) dir(path []int) *fsTestDir { + if len(path) == 0 { + return s + } + child, childFound := s.children.directories[path[0]] + if !childFound { + panic("dir not found") + } + return child.dir(path[1:]) +} + +func (s *fsTestState) dir(path []int) *fsTestDir { + return s.rootDir.dir(path) +} + +/* +func (s *fsTestChildren) id(path []int) msgs.InodeId { + fileId, fileFound := s.files[path[0]] + if fileFound { + if len(path) > 1 { + panic("bad path for file") + } + return fileId + } + dir, dirFound := s.directories[path[0]] + if dirFound { + if len(path) == 1 { + return dir.id + } else { + return dir.children.id(path[1:]) + } + } + panic("could not find path") +} + +func (s *fsTestState) id(path []int) msgs.InodeId { + + if len(path) == 0 { + return msgs.ROOT_DIR_INODE_ID + } + return s.rootDir.id(path) +} +*/ + +func (state *fsTestState) makeDir(log eggs.LogLevels, harness *harness, opts *fsTestOpts, parent []int, name int) []int { + if state.totalDirs >= opts.numDirs { + panic("ran out of dirs!") + } + dir := state.dir(parent) + _, dirExists := dir.children.directories[name] + if dirExists { + panic("conflicting name (dir)") + } + _, fileExists := dir.children.files[name] + if fileExists { + panic("conflicting name (files)") + } + parentId := dir.id + req := msgs.MakeDirectoryReq{ + OwnerId: parentId, + Name: strconv.Itoa(name), + Info: msgs.SetDirectoryInfo{Inherited: true}, + } + resp := msgs.MakeDirectoryResp{} + harness.cdcReq(&req, &resp) + dir.children.directories[name] = newFsTestDir(resp.Id) + state.totalDirs++ + path := append(parent, name) + return path +} + +func (state *fsTestState) makeFile(log eggs.LogLevels, harness *harness, opts *fsTestOpts, rand *rand.Rand, dirPath []int, name int) { + if state.totalFiles >= opts.numFiles { + panic("ran out of files!") + } + dir := state.dir(dirPath) + _, dirExists := dir.children.directories[name] + if dirExists { + panic("conflicting name (dir)") + } + _, fileExists := dir.children.files[name] + if fileExists { + panic("conflicting name (files)") + } + size := rand.Uint64() % (uint64(100) << 20) // up to 20MiB + id := harness.createFile(dir.id, strconv.Itoa(name), size) + state.totalFiles++ + dir.children.files[name] = id +} + +func (d *fsTestDir) check(log eggs.LogLevels, harness *harness) { + edges := harness.readDir(d.id) + if len(edges) != len(d.children.files)+len(d.children.directories) { + panic(fmt.Errorf("bad number of edges -- got %v, expected %v + %v", len(edges), len(d.children.files), len(d.children.files))) + } + for _, edge := range edges { + name, err := strconv.Atoi(edge.name) + if err != nil { + panic(err) + } + if edge.targetId.Type() == msgs.DIRECTORY { + _, present := d.children.directories[name] + if !present { + panic(fmt.Errorf("directory %v not found", name)) + } + } else { + _, present := d.children.files[name] + if !present { + panic(fmt.Errorf("file %v not found", name)) + } + } + } + // recurse down + for _, dir := range d.children.directories { + dir.check(log, harness) + } +} + +func fsTest( + log eggs.LogLevels, + opts *fsTestOpts, + stats *harnessStats, + blockServicesKeys map[msgs.BlockServiceId][16]byte, +) { + client, err := eggs.NewAllShardsClient() + if err != nil { + panic(err) + } + defer client.Close() + harness := newHarness(log, client, stats, blockServicesKeys) + state := fsTestState{ + rootDir: newFsTestDir(msgs.ROOT_DIR_INODE_ID), + } + branching := int(math.Log(float64(opts.numDirs)) / math.Log(float64(opts.maxDepth))) + // Create directories by first creating the first n-1 levels according to branching above + allDirs := [][]int{} + lastLevelDirs := [][]int{} + for depth := 1; depth <= opts.maxDepth; depth++ { + depthDirs := int(math.Pow(float64(branching), float64(depth))) + for i := 0; i < depthDirs; i++ { + parentPath := []int{} + j := i + for len(parentPath)+1 != depth { + j = j / branching + parentPath = append([]int{j}, parentPath...) + } + path := state.makeDir(log, harness, opts, parentPath, i) + allDirs = append(allDirs, path) + if depth == opts.maxDepth { + lastLevelDirs = append(lastLevelDirs, path) + } + } + } + // then create the leaves at random + source := rand.NewSource(0) + rand := rand.New(source) + for state.totalDirs < opts.numDirs { + parentPath := lastLevelDirs[int(rand.Uint32())%len(lastLevelDirs)] + state.makeDir(log, harness, opts, parentPath, state.totalDirs) + } + // now create files, random locations + for state.totalFiles < opts.numFiles { + dir := allDirs[int(rand.Uint32())%len(lastLevelDirs)] + state.makeFile(log, harness, opts, rand, dir, state.totalDirs+state.totalFiles) + } + // finally, check that our view of the world is the real view of the world + state.rootDir.check(log, harness) +} diff --git a/go/integrationtest/harness.go b/go/integrationtest/harness.go new file mode 100644 index 00000000..84207a01 --- /dev/null +++ b/go/integrationtest/harness.go @@ -0,0 +1,191 @@ +package main + +import ( + "fmt" + "sync/atomic" + "time" + "xtx/eggsfs/bincode" + "xtx/eggsfs/eggs" + "xtx/eggsfs/msgs" +) + +type edge struct { + name string + targetId msgs.InodeId +} + +type fullEdge struct { + current bool + name string + targetId msgs.InodeId + creationTime msgs.EggsTime +} + +type harnessStats struct { + // these arrays are indexed by req type + shardReqsCounts [256]int64 + shardReqsNanos [256]int64 + cdcReqsCounts [256]int64 + cdcReqsNanos [256]int64 +} + +type harness struct { + log eggs.LogLevels + client eggs.Client + stats *harnessStats + blockServicesKeys map[msgs.BlockServiceId][16]byte +} + +func (h *harness) shardReq( + shid msgs.ShardId, + reqBody bincode.Packable, + respBody bincode.Unpackable, +) { + msgKind := msgs.GetShardMessageKind(reqBody) + atomic.AddInt64(&h.stats.shardReqsCounts[msgKind], 1) + t0 := time.Now() + err := h.client.ShardRequest(h.log, shid, reqBody, respBody) + if err != nil { + panic(err) + } + elapsed := time.Since(t0) + atomic.AddInt64(&h.stats.shardReqsNanos[msgKind], elapsed.Nanoseconds()) +} + +func (h *harness) cdcReq( + reqBody bincode.Packable, + respBody bincode.Unpackable, +) { + msgKind := msgs.GetCDCMessageKind(reqBody) + atomic.AddInt64(&h.stats.cdcReqsCounts[msgKind], 1) + t0 := time.Now() + err := h.client.CDCRequest(h.log, reqBody, respBody) + if err != nil { + panic(err) + } + elapsed := time.Since(t0) + atomic.AddInt64(&h.stats.cdcReqsNanos[msgKind], elapsed.Nanoseconds()) +} + +func (h *harness) createFile(dirId msgs.InodeId, name string, size uint64) msgs.InodeId { + // construct + constructReq := msgs.ConstructFileReq{ + Type: msgs.FILE, + Note: "", + } + constructResp := msgs.ConstructFileResp{} + h.shardReq(dirId.Shard(), &constructReq, &constructResp) + // add spans + spanSize := uint64(10) << 20 // 10MiB + for offset := uint64(0); offset < size; offset += spanSize { + thisSpanSize := eggs.Min(spanSize, size-offset) + addSpanReq := msgs.AddSpanInitiateReq{ + FileId: constructResp.Id, + Cookie: constructResp.Cookie, + ByteOffset: offset, + StorageClass: 2, + Parity: msgs.MkParity(1, 1), + Crc32: [4]byte{0, 0, 0, 0}, + Size: thisSpanSize, + BlockSize: thisSpanSize, + BodyBlocks: []msgs.NewBlockInfo{ + {Crc32: [4]byte{0, 0, 0, 0}}, + {Crc32: [4]byte{0, 0, 0, 0}}, + }, + } + addSpanResp := msgs.AddSpanInitiateResp{} + h.shardReq(dirId.Shard(), &addSpanReq, &addSpanResp) + block0 := &addSpanResp.Blocks[0] + block1 := &addSpanResp.Blocks[1] + blockServiceKey0, blockServiceFound0 := h.blockServicesKeys[block0.BlockServiceId] + if !blockServiceFound0 { + panic(fmt.Errorf("could not find block service %v", block0.BlockServiceId)) + } + blockServiceKey1, blockServiceFound1 := h.blockServicesKeys[block1.BlockServiceId] + if !blockServiceFound1 { + panic(fmt.Errorf("could not find block service %v", block1.BlockServiceId)) + } + certifySpanReq := msgs.AddSpanCertifyReq{ + FileId: constructResp.Id, + Cookie: constructResp.Cookie, + ByteOffset: offset, + Proofs: []msgs.BlockProof{ + { + BlockId: block0.BlockId, + Proof: eggs.BlockAddProof(block0.BlockServiceId, block0.BlockId, blockServiceKey0), + }, + { + BlockId: block1.BlockId, + Proof: eggs.BlockAddProof(block1.BlockServiceId, block1.BlockId, blockServiceKey1), + }, + }, + } + certifySpanResp := msgs.AddSpanCertifyResp{} + h.shardReq(dirId.Shard(), &certifySpanReq, &certifySpanResp) + } + // link + linkReq := msgs.LinkFileReq{ + FileId: constructResp.Id, + Cookie: constructResp.Cookie, + OwnerId: dirId, + Name: name, + } + h.shardReq(dirId.Shard(), &linkReq, &msgs.LinkFileResp{}) + return constructResp.Id +} + +func (h *harness) readDir(dir msgs.InodeId) []edge { + req := msgs.ReadDirReq{ + DirId: dir, + StartHash: 0, + } + resp := msgs.ReadDirResp{} + edges := []edge{} + for { + h.shardReq(dir.Shard(), &req, &resp) + for _, result := range resp.Results { + edges = append(edges, edge{ + name: result.Name, + targetId: result.TargetId, + }) + } + req.StartHash = resp.NextHash + if req.StartHash == 0 { + break + } + } + return edges +} + +func (h *harness) fullReadDir(dirId msgs.InodeId) []fullEdge { + req := msgs.FullReadDirReq{ + DirId: msgs.ROOT_DIR_INODE_ID, + } + resp := msgs.FullReadDirResp{} + edges := []fullEdge{} + for { + h.shardReq(dirId.Shard(), &req, &resp) + for _, result := range resp.Results { + edges = append(edges, fullEdge{ + name: result.Name, + targetId: result.TargetId.Id(), + creationTime: result.CreationTime, + current: result.Current, + }) + } + req.Cursor = resp.Next + if req.Cursor.StartHash == 0 { + break + } + } + return edges +} + +func newHarness(log eggs.LogLevels, client eggs.Client, stats *harnessStats, blockServicesKeys map[msgs.BlockServiceId][16]byte) *harness { + return &harness{ + log: log, + client: client, + stats: stats, + blockServicesKeys: blockServicesKeys, + } +} diff --git a/go/integrationtest/integrationtest.go b/go/integrationtest/integrationtest.go index 2a4f531c..2a224012 100644 --- a/go/integrationtest/integrationtest.go +++ b/go/integrationtest/integrationtest.go @@ -4,522 +4,14 @@ import ( "encoding/hex" "flag" "fmt" - "math/rand" "os" "path" "runtime/debug" - "strings" - "sync" - "sync/atomic" "time" - "xtx/eggsfs/bincode" "xtx/eggsfs/eggs" "xtx/eggsfs/msgs" ) -type edge struct { - name string - targetId msgs.InodeId -} - -type fullEdge struct { - current bool - name string - targetId msgs.InodeId - creationTime msgs.EggsTime -} - -// The idea here is that we'd run some parts of the test also directly hitting the filesystem. -type harness interface { - createFile(name string, size uint64) msgs.InodeId - deleteFile(name string, id msgs.InodeId) - renameFile(targetId msgs.InodeId, oldName string, newName string) - statDir() msgs.EggsTime - readDir() []edge - fullReadDir() []fullEdge -} - -type harnessStats struct { - reqs int64 - /* - fileCreates int64 - fileConstructNanos int64 - fileLinkNanos int64 - */ - fileDeletes int64 - fileDeletesNanos int64 - fileRenames int64 - fileRenamesNanos int64 - statDirs int64 - statDirsNanos int64 - readDirs int64 - readDirsNanos int64 - fullReadDirs int64 - fullReadDirsNanos int64 -} - -type directHarness struct { - shid msgs.ShardId - client eggs.Client - stats *harnessStats - blockServicesKeys map[msgs.BlockServiceId][16]byte -} - -func (h *directHarness) shardReq( - reqBody bincode.Packable, - respBody bincode.Unpackable, -) { - atomic.AddInt64(&h.stats.reqs, 1) - err := h.client.ShardRequest(&eggs.LogToStdout{}, shid, reqBody, respBody) - if err != nil { - panic(err) - } -} - -func (h *directHarness) createFile(name string, size uint64) msgs.InodeId { - // construct - constructReq := msgs.ConstructFileReq{ - Type: msgs.FILE, - Note: "", - } - constructResp := msgs.ConstructFileResp{} - h.shardReq(&constructReq, &constructResp) - // add spans - spanSize := uint64(10) << 20 // 10MiB - for offset := uint64(0); offset < size; offset += spanSize { - thisSpanSize := eggs.Min(spanSize, size-offset) - addSpanReq := msgs.AddSpanInitiateReq{ - FileId: constructResp.Id, - Cookie: constructResp.Cookie, - ByteOffset: offset, - StorageClass: 2, - Parity: msgs.MkParity(1, 1), - Crc32: [4]byte{0, 0, 0, 0}, - Size: thisSpanSize, - BlockSize: thisSpanSize, - BodyBlocks: []msgs.NewBlockInfo{ - {Crc32: [4]byte{0, 0, 0, 0}}, - {Crc32: [4]byte{0, 0, 0, 0}}, - }, - } - addSpanResp := msgs.AddSpanInitiateResp{} - h.shardReq(&addSpanReq, &addSpanResp) - block0 := &addSpanResp.Blocks[0] - block1 := &addSpanResp.Blocks[1] - blockServiceKey0, blockServiceFound0 := h.blockServicesKeys[block0.BlockServiceId] - if !blockServiceFound0 { - panic(fmt.Errorf("could not find block service %v", block0.BlockServiceId)) - } - blockServiceKey1, blockServiceFound1 := h.blockServicesKeys[block1.BlockServiceId] - if !blockServiceFound1 { - panic(fmt.Errorf("could not find block service %v", block1.BlockServiceId)) - } - certifySpanReq := msgs.AddSpanCertifyReq{ - FileId: constructResp.Id, - Cookie: constructResp.Cookie, - ByteOffset: offset, - Proofs: []msgs.BlockProof{ - { - BlockId: block0.BlockId, - Proof: eggs.BlockAddProof(block0.BlockServiceId, block0.BlockId, blockServiceKey0), - }, - { - BlockId: block1.BlockId, - Proof: eggs.BlockAddProof(block1.BlockServiceId, block1.BlockId, blockServiceKey1), - }, - }, - } - certifySpanResp := msgs.AddSpanCertifyResp{} - h.shardReq(&certifySpanReq, &certifySpanResp) - } - // link - linkReq := msgs.LinkFileReq{ - FileId: constructResp.Id, - Cookie: constructResp.Cookie, - OwnerId: msgs.ROOT_DIR_INODE_ID, - Name: name, - } - h.shardReq(&linkReq, &msgs.LinkFileResp{}) - return constructResp.Id -} - -func (h *directHarness) deleteFile(name string, id msgs.InodeId) { - atomic.AddInt64(&h.stats.fileDeletes, 1) - deleteReq := msgs.SoftUnlinkFileReq{ - OwnerId: msgs.ROOT_DIR_INODE_ID, - FileId: id, - Name: name, - } - t0 := time.Now() - h.shardReq(&deleteReq, &msgs.SoftUnlinkFileResp{}) - atomic.AddInt64(&h.stats.fileDeletesNanos, time.Since(t0).Nanoseconds()) -} - -func (h *directHarness) renameFile(targetId msgs.InodeId, oldName string, newName string) { - atomic.AddInt64(&h.stats.fileRenames, 1) - renameReq := msgs.SameDirectoryRenameReq{ - TargetId: targetId, - DirId: msgs.ROOT_DIR_INODE_ID, - OldName: oldName, - NewName: newName, - } - t0 := time.Now() - h.shardReq(&renameReq, &msgs.SameDirectoryRenameResp{}) - atomic.AddInt64(&h.stats.fileRenamesNanos, time.Since(t0).Nanoseconds()) -} - -func (h *directHarness) statDir() msgs.EggsTime { - atomic.AddInt64(&h.stats.statDirs, 1) - statReq := msgs.StatDirectoryReq{ - Id: msgs.ROOT_DIR_INODE_ID, - } - statResp := msgs.StatDirectoryResp{} - t0 := time.Now() - h.shardReq(&statReq, &statResp) - atomic.AddInt64(&h.stats.statDirsNanos, time.Since(t0).Nanoseconds()) - return statResp.Mtime -} - -func (h *directHarness) readDir() []edge { - req := msgs.ReadDirReq{ - DirId: msgs.ROOT_DIR_INODE_ID, - StartHash: 0, - } - resp := msgs.ReadDirResp{} - edges := []edge{} - for { - atomic.AddInt64(&h.stats.readDirs, 1) - t0 := time.Now() - h.shardReq(&req, &resp) - atomic.AddInt64(&h.stats.readDirsNanos, time.Since(t0).Nanoseconds()) - for _, result := range resp.Results { - edges = append(edges, edge{ - name: result.Name, - targetId: result.TargetId, - }) - } - req.StartHash = resp.NextHash - if req.StartHash == 0 { - break - } - } - return edges -} - -func (h *directHarness) fullReadDir() []fullEdge { - req := msgs.FullReadDirReq{ - DirId: msgs.ROOT_DIR_INODE_ID, - } - resp := msgs.FullReadDirResp{} - edges := []fullEdge{} - for { - atomic.AddInt64(&h.stats.fullReadDirs, 1) - t0 := time.Now() - h.shardReq(&req, &resp) - atomic.AddInt64(&h.stats.fullReadDirsNanos, time.Since(t0).Nanoseconds()) - for _, result := range resp.Results { - edges = append(edges, fullEdge{ - name: result.Name, - targetId: result.TargetId.Id(), - creationTime: result.CreationTime, - current: result.Current, - }) - } - req.Cursor = resp.Next - if req.Cursor.StartHash == 0 { - break - } - } - return edges -} - -func newDirectHarness(shid msgs.ShardId, stats *harnessStats, blockServicesKeys map[msgs.BlockServiceId][16]byte) *directHarness { - client, err := eggs.NewShardSpecificClient(shid) - if err != nil { - panic(err) - } - return &directHarness{ - shid: shid, - client: client, - stats: stats, - blockServicesKeys: blockServicesKeys, - } -} - -const shid = msgs.ShardId(0) // hardcoded for now - -// actions - -type createFile struct { - name string - size uint64 -} - -type createdFile struct { - name string - id msgs.InodeId -} - -type deleteFile struct { - name string -} - -type renameFile struct { - oldName string - newName string -} - -// trace - -type checkpoint struct { - time msgs.EggsTime -} - -type files struct { - names []string - ids []msgs.InodeId - byName map[string]int -} - -func (files *files) addFile(name string, id msgs.InodeId) { - if _, wasPresent := files.byName[name]; wasPresent { - panic(fmt.Errorf("unexpected overwrite of %s", name)) - } - files.names = append(files.names, name) - files.ids = append(files.ids, id) - files.byName[name] = len(files.ids) - 1 -} - -func (files *files) id(name string) msgs.InodeId { - ix, present := files.byName[name] - if !present { - panic(fmt.Errorf("name not found %v", name)) - } - return files.ids[ix] -} - -func (files *files) deleteFile(name string) { - if _, wasPresent := files.byName[name]; !wasPresent { - panic(fmt.Errorf("name not found %v", name)) - } - ix := files.byName[name] - lastIx := len(files.ids) - 1 - if ix != lastIx { - files.ids[ix] = files.ids[lastIx] - files.names[ix] = files.names[lastIx] - files.byName[files.names[ix]] = ix - } - files.ids = files.ids[:lastIx] - files.names = files.names[:lastIx] - delete(files.byName, name) -} - -func genCreateFile(filePrefix string, rand *rand.Rand, files *files) createFile { - var name string - for { - name = fmt.Sprintf("%s%x", filePrefix, rand.Uint32()) - _, wasPresent := files.byName[name] - if !wasPresent { - delete(files.byName, name) - size := rand.Uint64() % (uint64(100) << 20) // up to 20MiB - return createFile{ - name: name, - size: size, - } - } - } -} - -func genDeleteFile(filePrefix string, rand *rand.Rand, files *files) deleteFile { - return deleteFile{name: files.names[int(rand.Uint32())%len(files.names)]} -} - -func genRenameFile(filePrefix string, rand *rand.Rand, files *files) renameFile { - oldName := genDeleteFile(filePrefix, rand, files).name - newName := genCreateFile(filePrefix, rand, files).name - return renameFile{ - oldName: oldName, - newName: newName, - } -} - -func genRenameOverrideFile(filePrefix string, rand *rand.Rand, files *files) renameFile { - oldName := genDeleteFile(filePrefix, rand, files).name - newName := genDeleteFile(filePrefix, rand, files).name - return renameFile{ - oldName: oldName, - newName: newName, - } -} - -func checkCheckpoint(prefix string, files *files, allEdges []edge) { - edges := []edge{} - for _, edge := range allEdges { - if !strings.HasPrefix(edge.name, prefix) { - continue - } - edges = append(edges, edge) - } - if len(edges) != len(files.names) { - panic(fmt.Errorf("expected %d edges, got %d", len(files.names), len(edges))) - } - for _, edge := range edges { - id := files.id(edge.name) - if id != edge.targetId { - panic(fmt.Errorf("expected targetId %v for edge %v, but got %v", id, edge.name, id)) - } - } -} - -func runCheckpoint(harness harness, prefix string, files *files) checkpoint { - edges := harness.readDir() - checkCheckpoint(prefix, files, edges) - return checkpoint{ - time: harness.statDir(), - } -} - -func runStep(harness harness, files *files, stepAny any) any { - switch step := stepAny.(type) { - case createFile: - id := harness.createFile(step.name, step.size) - files.addFile(step.name, id) - return createdFile{ - name: step.name, - id: id, - } - case deleteFile: - harness.deleteFile(step.name, files.id(step.name)) - files.deleteFile(step.name) - return step - case renameFile: - targetId := files.id(step.oldName) - harness.renameFile(targetId, step.oldName, step.newName) - files.deleteFile(step.oldName) - if _, wasPresent := files.byName[step.newName]; wasPresent { - // overwrite - files.deleteFile(step.newName) - } - files.addFile(step.newName, targetId) - return step - default: - panic(fmt.Errorf("bad step %T", stepAny)) - } -} - -func edgesAsOf(allEdges []fullEdge, t msgs.EggsTime) []edge { - namesToEdge := make(map[string]fullEdge) - - for _, fullEdge := range allEdges { - if fullEdge.creationTime > t { - continue - } - namesToEdge[fullEdge.name] = fullEdge - } - - edges := []edge{} - for _, fullEdge := range namesToEdge { - if fullEdge.targetId == msgs.NULL_INODE_ID { - continue - } - edges = append(edges, edge{ - name: fullEdge.name, - targetId: fullEdge.targetId, - }) - } - - return edges -} - -func replayCheckpoint(prefix string, files *files, fullEdges []fullEdge, t msgs.EggsTime) { - edges := edgesAsOf(fullEdges, t) - checkCheckpoint(prefix, files, edges) -} - -func replayStep(prefix string, files *files, fullEdges []fullEdge, stepAny any) { - switch step := stepAny.(type) { - case createdFile: - files.addFile(step.name, step.id) - case deleteFile: - files.deleteFile(step.name) - case renameFile: - targetId := files.id(step.oldName) - files.deleteFile(step.oldName) - if _, wasPresent := files.byName[step.newName]; wasPresent { - // overwrite - files.deleteFile(step.newName) - } - files.addFile(step.newName, targetId) - case checkpoint: - replayCheckpoint(prefix, files, fullEdges, step.time) - default: - panic(fmt.Errorf("bad step %T", stepAny)) - } -} - -func runTestSingle(harness harness, seed int64, filePrefix string) { - steps := 10 * 1000 // perform 10k actions - checkpointEvery := 100 // get times every 100 actions - targetFiles := 1000 // how many files we want - lowFiles := 500 - - // loop for n steps. at every step: - // * if we have never reached the target files, then just create a file. - // * if we have, create/delete/rename/rename with override at random. - // * every checkpointEvery steps, use readDir to make sure that we have the files that we expect. - // - // after we've finished, re-run through everything and re-check at ever checkpointSteps using - // fullReadDir. - trace := []any{} - - reachedTargetFiles := false - fls := files{ - names: []string{}, - ids: []msgs.InodeId{}, - byName: make(map[string]int), - } - source := rand.NewSource(seed) - rand := rand.New(source) - for stepIx := 0; stepIx < steps; stepIx++ { - if stepIx%checkpointEvery == 0 { - checkpoint := runCheckpoint(harness, filePrefix, &fls) - trace = append(trace, checkpoint) - } - var step any - if len(fls.names) < lowFiles { - reachedTargetFiles = false - } - if !reachedTargetFiles { - step = genCreateFile(filePrefix, rand, &fls) - } else { - which := rand.Uint32() % 4 - switch which { - case 0: - step = genCreateFile(filePrefix, rand, &fls) - case 1: - step = genDeleteFile(filePrefix, rand, &fls) - case 2: - step = genRenameFile(filePrefix, rand, &fls) - case 3: - step = genRenameOverrideFile(filePrefix, rand, &fls) - default: - panic(fmt.Errorf("bad which %d", which)) - } - } - reachedTargetFiles = reachedTargetFiles || len(fls.names) >= targetFiles - trace = append(trace, runStep(harness, &fls, step)) - } - fls = files{ - names: []string{}, - ids: []msgs.InodeId{}, - byName: make(map[string]int), - } - fullEdges := harness.fullReadDir() - for _, step := range trace { - replayStep(filePrefix, &fls, fullEdges, step) - } -} - func handleRecover(terminateChan chan any, err any) { if err != nil { fmt.Printf("PANIC %v. Stacktrace:\n", err) @@ -528,7 +20,52 @@ func handleRecover(terminateChan chan any, err any) { } } -func spawnTests(terminateChan chan any, blockServices []eggs.BlockService) { +func runTest(blockServicesKeys map[msgs.BlockServiceId][16]byte, what string, run func(stats *harnessStats)) { + stats := harnessStats{} + + fmt.Printf("running %s\n", what) + t0 := time.Now() + run(&stats) + elapsed := time.Since(t0) + totalShardRequests := int64(0) + totalShardNanos := int64(0) + for i := 0; i < 256; i++ { + totalShardRequests += stats.shardReqsCounts[i] + totalShardNanos += stats.shardReqsNanos[i] + } + totalCDCRequests := int64(0) + totalCDCNanos := int64(0) + for i := 0; i < 256; i++ { + totalCDCRequests += stats.cdcReqsCounts[i] + totalCDCNanos += stats.cdcReqsNanos[i] + } + + fmt.Printf(" ran test in %v, %v shard requests performed, %v CDC requests performed\n", elapsed, totalShardRequests, totalCDCRequests) + if totalShardRequests > 0 { + fmt.Printf(" shard reqs avg times:\n") + for i := 0; i < 256; i++ { + if stats.shardReqsCounts[i] == 0 { + continue + } + fmt.Printf(" %-30v %10v %10v\n", msgs.ShardMessageKind(i), stats.shardReqsCounts[i], time.Duration(stats.shardReqsNanos[i]/stats.shardReqsCounts[i])) + } + } + if totalCDCRequests > 0 { + fmt.Printf(" CDC reqs avg times:\n") + for i := 0; i < 256; i++ { + if stats.cdcReqsCounts[i] == 0 { + continue + } + fmt.Printf(" %-30v %10v %10v\n", msgs.CDCMessageKind(i), stats.cdcReqsCounts[i], time.Duration(stats.cdcReqsNanos[i]/stats.cdcReqsCounts[i])) + } + } + + cleanupAfterTest(blockServicesKeys) +} + +func runTests(terminateChan chan any, log eggs.LogLevels, blockServices []eggs.BlockService) { + defer func() { handleRecover(terminateChan, recover()) }() + blockServicesKeys := make(map[msgs.BlockServiceId][16]byte) for _, blockService := range blockServices { key, err := hex.DecodeString(blockService.SecretKey) @@ -543,61 +80,35 @@ func spawnTests(terminateChan chan any, blockServices []eggs.BlockService) { blockServicesKeys[blockService.Id] = fixedKey } - go func() { - defer func() { handleRecover(terminateChan, recover()) }() - numTests := uint8(5) - if numTests > 15 { - panic(fmt.Errorf("numTests %d too big for one-digit prefix", numTests)) - } - var wait sync.WaitGroup - wait.Add(int(numTests)) - fmt.Printf("running simple file creation/renaming/remove test with %d threads\n", numTests) - t0 := time.Now() - stats := harnessStats{} - for i := 0; i < int(numTests); i++ { - prefix := fmt.Sprintf("%x", i) - seed := int64(i) - go func() { - defer func() { handleRecover(terminateChan, recover()) }() - harness := newDirectHarness(shid, &stats, blockServicesKeys) - defer harness.client.Close() - runTestSingle(harness, seed, prefix) - wait.Done() - }() - } - wait.Wait() - elapsed := time.Since(t0) - fmt.Printf("ran test in %.2fs, %v requests performed, %.2f reqs/sec\n", float32(elapsed.Milliseconds())/100.0, stats.reqs, 1000.0*float32(stats.reqs/elapsed.Milliseconds())) - // fmt.Printf("fileCreates:\t%v\t%v\n", stats.fileCreates, time.Duration(stats.fileLinkNanos/stats.fileCreates)) - fmt.Printf("fileDeletes:\t%v\t%v\n", stats.fileDeletes, time.Duration(stats.fileDeletesNanos/stats.fileDeletes)) - fmt.Printf("fileRenames:\t%v\t%v\n", stats.fileRenames, time.Duration(stats.fileRenamesNanos/stats.fileRenames)) - fmt.Printf("statDirs:\t%v\t%v\n", stats.statDirs, time.Duration(stats.statDirsNanos/stats.statDirs)) - fmt.Printf("readDirs:\t%v\t%v\n", stats.readDirs, time.Duration(stats.readDirsNanos/stats.readDirs)) - fmt.Printf("fullReadDirs:\t%v\t%v\n", stats.fullReadDirs, time.Duration(stats.fullReadDirsNanos/stats.fullReadDirs)) + fileHistoryOpts := fileHistoryTestOpts{ + steps: 10 * 1000, // perform 10k actions + checkpointEvery: 100, // get times every 100 actions + targetFiles: 1000, // how many files we want + lowFiles: 500, + threads: 5, + } + runTest( + blockServicesKeys, + fmt.Sprintf("file history test, %v threads, %v steps", fileHistoryOpts.threads, fileHistoryOpts.steps), + func(stats *harnessStats) { + fileHistoryTest(log, &fileHistoryOpts, stats, blockServicesKeys) + }, + ) - client, err := eggs.NewShardSpecificClient(shid) - if err != nil { - panic(err) - } - dirInfo, err := eggs.GetDirectoryInfo(&eggs.LogToStdout{}, client, msgs.ROOT_DIR_INODE_ID) - if err != nil { - panic(err) - } - dirInfo.DeleteAfterVersions = 1 - if err := eggs.SetDirectoryInfo(&eggs.LogToStdout{}, client, msgs.ROOT_DIR_INODE_ID, false, dirInfo); err != nil { - panic(err) - } - err = eggs.CollectDirectories(&eggs.LogToStdout{}, shid) - if err != nil { - panic(err) - } - err = eggs.DestructFiles(&eggs.LogToStdout{}, shid, blockServicesKeys) - if err != nil { - panic(err) - } + fsTestOpts := fsTestOpts{ + numDirs: 1 * 1000, + numFiles: 100 * 1000, // around 100 files per dir + maxDepth: 4, + } + runTest( + blockServicesKeys, + fmt.Sprintf("simple fs test, %v dirs, %v files, %v depth", fsTestOpts.numDirs, fsTestOpts.numFiles, fsTestOpts.maxDepth), + func(stats *harnessStats) { + fsTest(log, &fsTestOpts, stats, blockServicesKeys) + }, + ) - terminateChan <- nil - }() + terminateChan <- nil } func main() { @@ -620,8 +131,12 @@ func main() { Debug: *debug, Coverage: *coverage, } - shardExe := eggs.BuildShardExe(&eggs.LogToStdout{}, &cppBuildOpts) - shuckleExe := eggs.BuildShuckleExe(&eggs.LogToStdout{}) + + log := &eggs.LogToStdout{} + + shardExe := eggs.BuildShardExe(log, &cppBuildOpts) + cdcExe := eggs.BuildCDCExe(log, &cppBuildOpts) + shuckleExe := eggs.BuildShuckleExe(log) cleanupDbDir := false tmpDataDir := *dataDir == "" @@ -677,21 +192,41 @@ func main() { }) } - blockServices := eggs.WaitForShuckle(fmt.Sprintf("localhost:%v", shucklePort), hddBlockServices+flashBlockServices, 10*time.Second) - - // Start shard - procs.StartShard(&eggs.ShardOpts{ - Exe: shardExe, - Dir: path.Join(*dataDir, "shard"), - Verbose: *verbose, - Shid: shid, - Valgrind: *valgrind, - WaitForShuckle: true, + // Start CDC + procs.StartCDC(&eggs.CDCOpts{ + Exe: cdcExe, + Dir: path.Join(*dataDir, "cdc"), + Verbose: *verbose, + Valgrind: *valgrind, }) - eggs.WaitForShard(shid, 10*time.Second) + waitShuckleFor := 10 * time.Second + fmt.Printf("waiting for shuckle for %v...\n", waitShuckleFor) + blockServices := eggs.WaitForShuckle(fmt.Sprintf("localhost:%v", shucklePort), hddBlockServices+flashBlockServices, waitShuckleFor) - spawnTests(terminateChan, blockServices) + // Start shards + for i := 0; i < 256; i++ { + shid := msgs.ShardId(i) + procs.StartShard(&eggs.ShardOpts{ + Exe: shardExe, + Dir: path.Join(*dataDir, fmt.Sprintf("shard_%03d", i)), + Verbose: *verbose, + Shid: shid, + Valgrind: *valgrind, + WaitForShuckle: true, + }) + } + + waitShardFor := 20 * time.Second + fmt.Printf("waiting for shards for %v...\n", waitShardFor) + for i := 0; i < 256; i++ { + eggs.WaitForShard(msgs.ShardId(i), waitShardFor) + } + + fmt.Printf("operational 🤖\n") + + // start tests + go func() { runTests(terminateChan, log, blockServices) }() // wait for things to finish err := <-terminateChan diff --git a/go/msgs/msgs.go b/go/msgs/msgs.go index 1b435436..8c7bbfb8 100644 --- a/go/msgs/msgs.go +++ b/go/msgs/msgs.go @@ -789,6 +789,50 @@ type SpanPolicy struct { Parity Parity } +// MSB: whether this policy is active or not. After: nanoseconds. +type DeleteAfterTime uint64 + +func (dat DeleteAfterTime) Active() bool { + return (uint64(dat) >> 63) != 0 +} + +func (dat DeleteAfterTime) Time() time.Duration { + return time.Duration(uint64(dat) & ^(uint64(1) << 63)) +} + +func InactiveDeleteAfterTime() DeleteAfterTime { + return 0 +} + +func ActiveDeleteAfterTime(duration time.Duration) DeleteAfterTime { + if duration.Nanoseconds() < 0 { + panic(fmt.Errorf("negative duration in DeleteAfterTime: %v", duration)) + } + return DeleteAfterTime((uint64(1) << 63) | uint64(duration.Nanoseconds())) +} + +// MSB: whether this policy is active or not. After: nanoseconds. +type DeleteAfterVersions uint16 + +func (dav DeleteAfterVersions) Active() bool { + return (uint16(dav) >> 15) != 0 +} + +func (dav DeleteAfterVersions) Versions() uint16 { + return uint16(dav) & ^(uint16(1) << 15) +} + +func InactiveDeleteAfterVersions() DeleteAfterVersions { + return 0 +} + +func ActiveDeleteAfterVersions(versions int16) DeleteAfterVersions { + if versions < 0 { + panic(fmt.Errorf("negative versions: %v", versions)) + } + return DeleteAfterVersions((uint16(1) << 15) | uint16(versions)) +} + // See SnapshotPolicy for the meaning of `DeleteAfterTime` and // `DeleteAfterVersions` type DirectoryInfoBody struct { @@ -796,8 +840,8 @@ type DirectoryInfoBody struct { // since it is opaque to the server and therefore we might want // to evolve it separatedly. Right now it's 1 for this data structure. Version uint8 - DeleteAfterTime uint64 // nanoseconds - DeleteAfterVersions uint8 + DeleteAfterTime DeleteAfterTime + DeleteAfterVersions DeleteAfterVersions // Sorted by MaxSize. There's always an implicit policy for inline // spans (max size 255). Which means that the first `MaxSize` // must be > 255. diff --git a/go/msgs/msgs_bincode.go b/go/msgs/msgs_bincode.go index 29de4248..a6020fe1 100644 --- a/go/msgs/msgs_bincode.go +++ b/go/msgs/msgs_bincode.go @@ -54,6 +54,8 @@ const ( BAD_DIRECTORY_INFO ErrCode = 55 CREATION_TIME_TOO_RECENT ErrCode = 56 DEADLINE_NOT_PASSED ErrCode = 57 + SAME_SOURCE_AND_DESTINATION ErrCode = 58 + SAME_DIRECTORIES ErrCode = 59 ) func (err ErrCode) String() string { @@ -154,6 +156,10 @@ func (err ErrCode) String() string { return "CREATION_TIME_TOO_RECENT" case 57: return "DEADLINE_NOT_PASSED" + case 58: + return "SAME_SOURCE_AND_DESTINATION" + case 59: + return "SAME_DIRECTORIES" default: return fmt.Sprintf("ErrCode(%d)", err) } @@ -232,6 +238,77 @@ func GetShardMessageKind(body any) ShardMessageKind { } } +func (k ShardMessageKind) String() string { + switch k { + case 1: + return "LOOKUP" + case 2: + return "STAT_FILE" + case 10: + return "STAT_TRANSIENT_FILE" + case 8: + return "STAT_DIRECTORY" + case 3: + return "READ_DIR" + case 4: + return "CONSTRUCT_FILE" + case 5: + return "ADD_SPAN_INITIATE" + case 6: + return "ADD_SPAN_CERTIFY" + case 7: + return "LINK_FILE" + case 12: + return "SOFT_UNLINK_FILE" + case 13: + return "FILE_SPANS" + case 14: + return "SAME_DIRECTORY_RENAME" + case 15: + return "SET_DIRECTORY_INFO" + case 21: + return "VISIT_DIRECTORIES" + case 32: + return "VISIT_FILES" + case 22: + return "VISIT_TRANSIENT_FILES" + case 33: + return "FULL_READ_DIR" + case 23: + return "REMOVE_NON_OWNED_EDGE" + case 24: + return "INTRA_SHARD_HARD_FILE_UNLINK" + case 25: + return "REMOVE_SPAN_INITIATE" + case 26: + return "REMOVE_SPAN_CERTIFY" + case 34: + return "SWAP_BLOCKS" + case 35: + return "BLOCK_SERVICE_FILES" + case 36: + return "REMOVE_INODE" + case 128: + return "CREATE_DIRECTORY_INODE" + case 129: + return "SET_DIRECTORY_OWNER" + case 137: + return "REMOVE_DIRECTORY_OWNER" + case 130: + return "CREATE_LOCKED_CURRENT_EDGE" + case 131: + return "LOCK_CURRENT_EDGE" + case 132: + return "UNLOCK_CURRENT_EDGE" + case 134: + return "REMOVE_OWNED_SNAPSHOT_FILE_EDGE" + case 135: + return "MAKE_FILE_TRANSIENT" + default: + return fmt.Sprintf("ShardMessageKind(%d)", k) + } +} + const ( LOOKUP ShardMessageKind = 0x1 @@ -289,6 +366,25 @@ func GetCDCMessageKind(body any) CDCMessageKind { } } +func (k CDCMessageKind) String() string { + switch k { + case 1: + return "MAKE_DIRECTORY" + case 2: + return "RENAME_FILE" + case 3: + return "SOFT_UNLINK_DIRECTORY" + case 4: + return "RENAME_DIRECTORY" + case 5: + return "HARD_UNLINK_DIRECTORY" + case 6: + return "HARD_UNLINK_FILE" + default: + return fmt.Sprintf("CDCMessageKind(%d)", k) + } +} + const ( MAKE_DIRECTORY CDCMessageKind = 0x1 @@ -1770,7 +1866,7 @@ func (v *SpanPolicy) Unpack(buf *bincode.Buf) error { func (v *DirectoryInfoBody) Pack(buf *bincode.Buf) { buf.PackU8(uint8(v.Version)) buf.PackU64(uint64(v.DeleteAfterTime)) - buf.PackU8(uint8(v.DeleteAfterVersions)) + buf.PackU16(uint16(v.DeleteAfterVersions)) len1 := len(v.SpanPolicies) buf.PackLength(len1) for i := 0; i < len1; i++ { @@ -1785,7 +1881,7 @@ func (v *DirectoryInfoBody) Unpack(buf *bincode.Buf) error { if err := buf.UnpackU64((*uint64)(&v.DeleteAfterTime)); err != nil { return err } - if err := buf.UnpackU8((*uint8)(&v.DeleteAfterVersions)); err != nil { + if err := buf.UnpackU16((*uint16)(&v.DeleteAfterVersions)); err != nil { return err } var len1 int diff --git a/python/basic_client.py b/python/basic_client.py index e36b71bb..d42f65bd 100755 --- a/python/basic_client.py +++ b/python/basic_client.py @@ -573,6 +573,20 @@ def block_service_files(block_service_id: int): for f in fs: print(f'- 0x{f:016X}') +@command('visit_files') +def visit_files(): + for shard in range(256): + begin_id = 0 + while True: + resp = send_shard_request_or_raise(shard, VisitFilesReq(begin_id)) + assert isinstance(resp, VisitFilesResp) + for i in resp.ids: + print(f'inode id: 0x{i:016X}') + print(f'shard: {inode_id_shard(i)}') + begin_id = resp.next_id + if begin_id == 0: + break + def main() -> None: args = sys.argv[1:] diff --git a/python/msgs.py b/python/msgs.py index c25ae8ab..b7f7fb23 100644 --- a/python/msgs.py +++ b/python/msgs.py @@ -56,6 +56,8 @@ class ErrCode(enum.IntEnum): BAD_DIRECTORY_INFO = 55 CREATION_TIME_TOO_RECENT = 56 DEADLINE_NOT_PASSED = 57 + SAME_SOURCE_AND_DESTINATION = 58 + SAME_DIRECTORIES = 59 class ShardMessageKind(enum.IntEnum): LOOKUP = 0x1 @@ -381,7 +383,7 @@ class SpanPolicy(bincode.Packable): @dataclass class DirectoryInfoBody(bincode.Packable): - STATIC_SIZE: ClassVar[int] = 1 + 8 + 1 + 2 # version + delete_after_time + delete_after_versions + len(span_policies) + STATIC_SIZE: ClassVar[int] = 1 + 8 + 2 + 2 # version + delete_after_time + delete_after_versions + len(span_policies) version: int delete_after_time: int delete_after_versions: int @@ -390,7 +392,7 @@ class DirectoryInfoBody(bincode.Packable): def pack_into(self, b: bytearray) -> None: bincode.pack_u8_into(self.version, b) bincode.pack_u64_into(self.delete_after_time, b) - bincode.pack_u8_into(self.delete_after_versions, b) + bincode.pack_u16_into(self.delete_after_versions, b) bincode.pack_u16_into(len(self.span_policies), b) for i in range(len(self.span_policies)): self.span_policies[i].pack_into(b) @@ -400,7 +402,7 @@ class DirectoryInfoBody(bincode.Packable): def unpack(u: bincode.UnpackWrapper) -> 'DirectoryInfoBody': version = bincode.unpack_u8(u) delete_after_time = bincode.unpack_u64(u) - delete_after_versions = bincode.unpack_u8(u) + delete_after_versions = bincode.unpack_u16(u) span_policies: List[Any] = [None]*bincode.unpack_u16(u) for i in range(len(span_policies)): span_policies[i] = SpanPolicy.unpack(u) @@ -410,7 +412,7 @@ class DirectoryInfoBody(bincode.Packable): _size = 0 _size += 1 # version _size += 8 # delete_after_time - _size += 1 # delete_after_versions + _size += 2 # delete_after_versions _size += 2 # len(span_policies) for i in range(len(self.span_policies)): _size += self.span_policies[i].calc_packed_size() # span_policies[i]