diff --git a/.vscode/settings.json b/.vscode/settings.json index 7a5473ff..aa7bae7e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,6 +11,5 @@ "go/runeggs/runeggs": true, "go/gcdaemon/gcdaemon": true, "go/cli/cli": true, - }, - "cmake.configureOnOpen": false + } } \ No newline at end of file diff --git a/cpp/cdc/eggscdc.cpp b/cpp/cdc/eggscdc.cpp index b4eedcb6..d2f13e3b 100644 --- a/cpp/cdc/eggscdc.cpp +++ b/cpp/cdc/eggscdc.cpp @@ -79,7 +79,9 @@ int main(int argc, char** argv) { options.level = std::min(LogLevel::LOG_DEBUG, options.level); } else if (arg == "-log-level") { std::string logLevel = getNextArg(); - if (logLevel == "debug") { + if (logLevel == "trace") { + options.level = LogLevel::LOG_TRACE; + } else if (logLevel == "debug") { options.level = LogLevel::LOG_DEBUG; } else if (logLevel == "info") { options.level = LogLevel::LOG_INFO; @@ -107,8 +109,8 @@ int main(int argc, char** argv) { } #ifndef EGGS_DEBUG - if (options.level <= LogLevel::LOG_DEBUG) { - die("Cannot use -verbose for non-debug builds (it won't work)."); + if (options.level <= LogLevel::LOG_TRACE) { + die("Cannot use log level trace trace for non-debug builds (it won't work)."); } #endif diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index abc24c90..1168023b 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -3,4 +3,4 @@ file(GLOB core_headers CONFIGURE_DEPENDS "*.hpp") add_library(core ${core_sources} ${core_headers}) add_dependencies(core thirdparty) -target_link_libraries(core PRIVATE z lzma dwarf elf rocksdb lz4 zstd uring unwind xxhash) +target_link_libraries(core PRIVATE rocksdb lz4 zstd uring xxhash) diff --git a/cpp/core/Env.hpp b/cpp/core/Env.hpp index 0693c092..9cc51cc8 100644 --- a/cpp/core/Env.hpp +++ b/cpp/core/Env.hpp @@ -9,9 +9,10 @@ #include "Time.hpp" enum class LogLevel : uint32_t { - LOG_DEBUG = 0, - LOG_INFO = 1, - LOG_ERROR = 2, + LOG_TRACE = 0, + LOG_DEBUG = 1, + LOG_INFO = 2, + LOG_ERROR = 3, }; std::ostream& operator<<(std::ostream& out, LogLevel ll); @@ -26,10 +27,6 @@ public: template void _log(LogLevel level, const std::string& prefix, const char* fmt, Args&&... args) { - if (level < _logLevel) { - return; - } - std::scoped_lock lock(_mutex); std::stringstream ss; format_pack(ss, fmt, args...); @@ -40,6 +37,10 @@ public: } } + bool _shouldLog(LogLevel level) { + return level >= _logLevel; + } + void flush() { std::scoped_lock lock(_mutex); _out.flush(); @@ -63,28 +64,45 @@ public: _log(LogLevel::LOG_ERROR, fmt, std::forward(args)...); } + bool _shouldLog(LogLevel level) { + return _logger._shouldLog(level); + } + void flush() { _logger.flush(); } }; -#ifdef EGGS_DEBUG - #define LOG_DEBUG(env, ...) \ +#ifdef EGGS_TRACE + #define LOG_TRACE(env, ...) \ do { \ - (env)._log(LogLevel::LOG_DEBUG, VALIDATE_FORMAT(__VA_ARGS__)); \ + if (unlikely((env)._shouldLog(LogLevel::LOG_TRACE))) { \ + (env)._log(LogLevel::LOG_TRACE, VALIDATE_FORMAT(__VA_ARGS__)); \ + } \ } while (false) #else - #define LOG_DEBUG(env, ...) do {} while (false) + #define LOG_TRACE(env, ...) do {} while (false) #endif +#define LOG_DEBUG(env, ...) \ + do { \ + if (unlikely((env)._shouldLog(LogLevel::LOG_DEBUG))) { \ + (env)._log(LogLevel::LOG_DEBUG, VALIDATE_FORMAT(__VA_ARGS__)); \ + } \ + } while (false) + #define LOG_INFO(env, ...) \ do { \ - (env)._log(LogLevel::LOG_INFO, VALIDATE_FORMAT(__VA_ARGS__)); \ + if (likely((env)._shouldLog(LogLevel::LOG_INFO))) { \ + (env)._log(LogLevel::LOG_INFO, VALIDATE_FORMAT(__VA_ARGS__)); \ + } \ } while (false) #define LOG_ERROR(env, ...) \ do { \ - (env)._log(LogLevel::LOG_ERROR, VALIDATE_FORMAT(__VA_ARGS__)); \ + if (likely((env)._shouldLog(LogLevel::LOG_ERROR))) { \ + (env)._log(LogLevel::LOG_ERROR, VALIDATE_FORMAT(__VA_ARGS__)); \ + } \ } while (false) // The interface for this will be different -- we want some kind of alert object diff --git a/cpp/core/MsgsGen.cpp b/cpp/core/MsgsGen.cpp index 28ce3805..4e069eb5 100644 --- a/cpp/core/MsgsGen.cpp +++ b/cpp/core/MsgsGen.cpp @@ -626,6 +626,7 @@ void BlockServiceInfo::pack(BincodeBuf& buf) const { buf.packScalar(availableBytes); buf.packScalar(blocks); buf.packBytes(path); + lastSeen.pack(buf); } void BlockServiceInfo::unpack(BincodeBuf& buf) { id = buf.unpackScalar(); @@ -640,6 +641,7 @@ void BlockServiceInfo::unpack(BincodeBuf& buf) { availableBytes = buf.unpackScalar(); blocks = buf.unpackScalar(); buf.unpackBytes(path); + lastSeen.unpack(buf); } void BlockServiceInfo::clear() { id = uint64_t(0); @@ -654,6 +656,7 @@ void BlockServiceInfo::clear() { availableBytes = uint64_t(0); blocks = uint64_t(0); path.clear(); + lastSeen = EggsTime(); } bool BlockServiceInfo::operator==(const BlockServiceInfo& rhs) const { if ((uint64_t)this->id != (uint64_t)rhs.id) { return false; }; @@ -668,32 +671,37 @@ bool BlockServiceInfo::operator==(const BlockServiceInfo& rhs) const { if ((uint64_t)this->availableBytes != (uint64_t)rhs.availableBytes) { return false; }; if ((uint64_t)this->blocks != (uint64_t)rhs.blocks) { return false; }; if (path != rhs.path) { return false; }; + if ((EggsTime)this->lastSeen != (EggsTime)rhs.lastSeen) { return false; }; return true; } std::ostream& operator<<(std::ostream& out, const BlockServiceInfo& x) { - out << "BlockServiceInfo(" << "Id=" << x.id << ", " << "Ip1=" << x.ip1 << ", " << "Port1=" << x.port1 << ", " << "Ip2=" << x.ip2 << ", " << "Port2=" << x.port2 << ", " << "StorageClass=" << (int)x.storageClass << ", " << "FailureDomain=" << x.failureDomain << ", " << "SecretKey=" << x.secretKey << ", " << "CapacityBytes=" << x.capacityBytes << ", " << "AvailableBytes=" << x.availableBytes << ", " << "Blocks=" << x.blocks << ", " << "Path=" << x.path << ")"; + out << "BlockServiceInfo(" << "Id=" << x.id << ", " << "Ip1=" << x.ip1 << ", " << "Port1=" << x.port1 << ", " << "Ip2=" << x.ip2 << ", " << "Port2=" << x.port2 << ", " << "StorageClass=" << (int)x.storageClass << ", " << "FailureDomain=" << x.failureDomain << ", " << "SecretKey=" << x.secretKey << ", " << "CapacityBytes=" << x.capacityBytes << ", " << "AvailableBytes=" << x.availableBytes << ", " << "Blocks=" << x.blocks << ", " << "Path=" << x.path << ", " << "LastSeen=" << x.lastSeen << ")"; return out; } void ShardInfo::pack(BincodeBuf& buf) const { buf.packFixedBytes<4>(ip); buf.packScalar(port); + lastSeen.pack(buf); } void ShardInfo::unpack(BincodeBuf& buf) { buf.unpackFixedBytes<4>(ip); port = buf.unpackScalar(); + lastSeen.unpack(buf); } void ShardInfo::clear() { ip.clear(); port = uint16_t(0); + lastSeen = EggsTime(); } bool ShardInfo::operator==(const ShardInfo& rhs) const { if (ip != rhs.ip) { return false; }; if ((uint16_t)this->port != (uint16_t)rhs.port) { return false; }; + if ((EggsTime)this->lastSeen != (EggsTime)rhs.lastSeen) { return false; }; return true; } std::ostream& operator<<(std::ostream& out, const ShardInfo& x) { - out << "ShardInfo(" << "Ip=" << x.ip << ", " << "Port=" << x.port << ")"; + out << "ShardInfo(" << "Ip=" << x.ip << ", " << "Port=" << x.port << ", " << "LastSeen=" << x.lastSeen << ")"; return out; } @@ -2682,22 +2690,26 @@ std::ostream& operator<<(std::ostream& out, const CdcReq& x) { void CdcResp::pack(BincodeBuf& buf) const { buf.packFixedBytes<4>(ip); buf.packScalar(port); + lastSeen.pack(buf); } void CdcResp::unpack(BincodeBuf& buf) { buf.unpackFixedBytes<4>(ip); port = buf.unpackScalar(); + lastSeen.unpack(buf); } void CdcResp::clear() { ip.clear(); port = uint16_t(0); + lastSeen = EggsTime(); } bool CdcResp::operator==(const CdcResp& rhs) const { if (ip != rhs.ip) { return false; }; if ((uint16_t)this->port != (uint16_t)rhs.port) { return false; }; + if ((EggsTime)this->lastSeen != (EggsTime)rhs.lastSeen) { return false; }; return true; } std::ostream& operator<<(std::ostream& out, const CdcResp& x) { - out << "CdcResp(" << "Ip=" << x.ip << ", " << "Port=" << x.port << ")"; + out << "CdcResp(" << "Ip=" << x.ip << ", " << "Port=" << x.port << ", " << "LastSeen=" << x.lastSeen << ")"; return out; } diff --git a/cpp/core/MsgsGen.hpp b/cpp/core/MsgsGen.hpp index c4054db0..66ddd1fe 100644 --- a/cpp/core/MsgsGen.hpp +++ b/cpp/core/MsgsGen.hpp @@ -457,8 +457,9 @@ struct BlockServiceInfo { uint64_t availableBytes; uint64_t blocks; BincodeBytes path; + EggsTime lastSeen; - static constexpr uint16_t STATIC_SIZE = 8 + BincodeFixedBytes<4>::STATIC_SIZE + 2 + BincodeFixedBytes<4>::STATIC_SIZE + 2 + 1 + BincodeFixedBytes<16>::STATIC_SIZE + BincodeFixedBytes<16>::STATIC_SIZE + 8 + 8 + 8 + BincodeBytes::STATIC_SIZE; // id + ip1 + port1 + ip2 + port2 + storageClass + failureDomain + secretKey + capacityBytes + availableBytes + blocks + path + static constexpr uint16_t STATIC_SIZE = 8 + BincodeFixedBytes<4>::STATIC_SIZE + 2 + BincodeFixedBytes<4>::STATIC_SIZE + 2 + 1 + BincodeFixedBytes<16>::STATIC_SIZE + BincodeFixedBytes<16>::STATIC_SIZE + 8 + 8 + 8 + BincodeBytes::STATIC_SIZE + 8; // id + ip1 + port1 + ip2 + port2 + storageClass + failureDomain + secretKey + capacityBytes + availableBytes + blocks + path + lastSeen BlockServiceInfo() { clear(); } uint16_t packedSize() const { @@ -475,6 +476,7 @@ struct BlockServiceInfo { _size += 8; // availableBytes _size += 8; // blocks _size += path.packedSize(); // path + _size += 8; // lastSeen return _size; } void pack(BincodeBuf& buf) const; @@ -488,14 +490,16 @@ std::ostream& operator<<(std::ostream& out, const BlockServiceInfo& x); struct ShardInfo { BincodeFixedBytes<4> ip; uint16_t port; + EggsTime lastSeen; - static constexpr uint16_t STATIC_SIZE = BincodeFixedBytes<4>::STATIC_SIZE + 2; // ip + port + static constexpr uint16_t STATIC_SIZE = BincodeFixedBytes<4>::STATIC_SIZE + 2 + 8; // ip + port + lastSeen ShardInfo() { clear(); } uint16_t packedSize() const { uint16_t _size = 0; _size += BincodeFixedBytes<4>::STATIC_SIZE; // ip _size += 2; // port + _size += 8; // lastSeen return _size; } void pack(BincodeBuf& buf) const; @@ -2430,14 +2434,16 @@ std::ostream& operator<<(std::ostream& out, const CdcReq& x); struct CdcResp { BincodeFixedBytes<4> ip; uint16_t port; + EggsTime lastSeen; - static constexpr uint16_t STATIC_SIZE = BincodeFixedBytes<4>::STATIC_SIZE + 2; // ip + port + static constexpr uint16_t STATIC_SIZE = BincodeFixedBytes<4>::STATIC_SIZE + 2 + 8; // ip + port + lastSeen CdcResp() { clear(); } uint16_t packedSize() const { uint16_t _size = 0; _size += BincodeFixedBytes<4>::STATIC_SIZE; // ip _size += 2; // port + _size += 8; // lastSeen return _size; } void pack(BincodeBuf& buf) const; diff --git a/cpp/core/Undertaker.hpp b/cpp/core/Undertaker.hpp index 4139b7b4..660973c7 100644 --- a/cpp/core/Undertaker.hpp +++ b/cpp/core/Undertaker.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include "SBRMUnix.hpp" diff --git a/cpp/shard/ShardDB.cpp b/cpp/shard/ShardDB.cpp index cb1d9d5c..47410381 100644 --- a/cpp/shard/ShardDB.cpp +++ b/cpp/shard/ShardDB.cpp @@ -1525,7 +1525,8 @@ struct ShardDBImpl { } if (err == NO_ERROR) { - LOG_DEBUG(_env, "prepared log entry of kind %s, for request of kind %s: %s", logEntryBody.kind(), req.kind(), logEntryBody); + LOG_DEBUG(_env, "prepared log entry of kind %s, for request of kind %s", logEntryBody.kind(), req.kind()); + LOG_TRACE(_env, "log entry body: %s", logEntryBody); } else { LOG_INFO(_env, "could not prepare log entry for request of kind %s: %s", req.kind(), err); } @@ -2954,7 +2955,7 @@ struct ShardDBImpl { EggsTime time = logEntry.time; const auto& logEntryBody = logEntry.body; - LOG_DEBUG(_env, "about to apply log entry %s", logEntryBody); + LOG_TRACE(_env, "about to apply log entry %s", logEntryBody); switch (logEntryBody.kind()) { case ShardLogEntryKind::CONSTRUCT_FILE: diff --git a/cpp/shard/eggsshard.cpp b/cpp/shard/eggsshard.cpp index e9ae8685..3104d261 100644 --- a/cpp/shard/eggsshard.cpp +++ b/cpp/shard/eggsshard.cpp @@ -103,7 +103,9 @@ int main(int argc, char** argv) { options.level = std::min(LogLevel::LOG_DEBUG, options.level); } else if (arg == "-log-level") { std::string logLevel = getNextArg(); - if (logLevel == "debug") { + if (logLevel == "trace") { + options.level = LogLevel::LOG_TRACE; + } else if (logLevel == "debug") { options.level = LogLevel::LOG_DEBUG; } else if (logLevel == "info") { options.level = LogLevel::LOG_INFO; @@ -135,8 +137,8 @@ int main(int argc, char** argv) { } #ifndef EGGS_DEBUG - if (options.level <= LogLevel::LOG_DEBUG) { - die("Cannot use -verbose for non-debug builds (it won't work)."); + if (options.level <= LogLevel::LOG_TRACE) { + die("Cannot use trace for non-debug builds (it won't work)."); } #endif diff --git a/go/bincodegen/bincodegen.go b/go/bincodegen/bincodegen.go index da8a2aec..37a5cd58 100644 --- a/go/bincodegen/bincodegen.go +++ b/go/bincodegen/bincodegen.go @@ -217,21 +217,21 @@ func reqRespEnum(rr reqRespType) string { return reqEnum } -func generateGoMsgKind(out io.Writer, typeName string, reqResps []reqRespType) { +func generateGoMsgKind(out io.Writer, kindTypeName string, reqInterface string, respInterface string, mkName string, reqResps []reqRespType) { seenKinds := map[uint8]bool{} - fmt.Fprintf(out, "func (k %s) String() string {\n", typeName) + fmt.Fprintf(out, "func (k %s) String() string {\n", kindTypeName) fmt.Fprintf(out, "\tswitch k {\n") for _, reqResp := range reqResps { present := seenKinds[reqResp.kind] if present { - panic(fmt.Errorf("duplicate kind %d for %s", reqResp.kind, typeName)) + panic(fmt.Errorf("duplicate kind %d for %s", reqResp.kind, kindTypeName)) } fmt.Fprintf(out, "\tcase %v:\n", reqResp.kind) fmt.Fprintf(out, "\t\treturn \"%s\"\n", reqRespEnum(reqResp)) } fmt.Fprintf(out, "\tdefault:\n") - fmt.Fprintf(out, "\t\treturn fmt.Sprintf(\"%s(%%d)\", k)\n", typeName) + fmt.Fprintf(out, "\t\treturn fmt.Sprintf(\"%s(%%d)\", k)\n", kindTypeName) fmt.Fprintf(out, "\t}\n") fmt.Fprintf(out, "}\n\n") @@ -239,9 +239,20 @@ func generateGoMsgKind(out io.Writer, typeName string, reqResps []reqRespType) { fmt.Fprintf(out, "const (\n") for _, reqResp := range reqResps { - fmt.Fprintf(out, "\t%s %s = 0x%X\n", reqRespEnum(reqResp), typeName, reqResp.kind) + fmt.Fprintf(out, "\t%s %s = 0x%X\n", reqRespEnum(reqResp), kindTypeName, reqResp.kind) } fmt.Fprintf(out, ")\n\n") + + fmt.Fprintf(out, "func %s(k string) (%s, %s) {\n", mkName, reqInterface, respInterface) + fmt.Fprintf(out, "\tswitch {\n") + for _, reqResp := range reqResps { + fmt.Fprintf(out, "\tcase k == %q:\n", reqRespEnum(reqResp)) + fmt.Fprintf(out, "\t\treturn &%v{}, &%v{}\n", reqResp.req.Name(), reqResp.resp.Name()) + } + fmt.Fprintf(out, "\tdefault:\n") + fmt.Fprintf(out, "\t\tpanic(fmt.Errorf(\"bad kind string %%s\", k))\n") + fmt.Fprintf(out, "\t}\n") + fmt.Fprintf(out, "}\n\n") } type reqRespType struct { @@ -293,9 +304,9 @@ func generateGo(errors []string, shardReqResps []reqRespType, cdcReqResps []reqR generateGoErrorCodes(out, errors) - generateGoMsgKind(out, "ShardMessageKind", shardReqResps) - generateGoMsgKind(out, "CDCMessageKind", cdcReqResps) - generateGoMsgKind(out, "ShuckleMessageKind", shuckleReqResps) + generateGoMsgKind(out, "ShardMessageKind", "ShardRequest", "ShardResponse", "MkShardMessage", shardReqResps) + generateGoMsgKind(out, "CDCMessageKind", "CDCRequest", "CDCResponse", "MkCDCMessage", cdcReqResps) + generateGoMsgKind(out, "ShuckleMessageKind", "ShuckleRequest", "ShuckleResponse", "MkShuckleMessage", shuckleReqResps) for _, reqResp := range shardReqResps { generateGoReqResp(out, reqResp, "ShardMessageKind", "ShardRequestKind", "ShardResponseKind") diff --git a/go/eggs/blockservicereq.go b/go/eggs/blockservicereq.go index 6d31d772..6633190f 100644 --- a/go/eggs/blockservicereq.go +++ b/go/eggs/blockservicereq.go @@ -15,7 +15,7 @@ import ( // The first ip1/port1 cannot be zeroed, the second one can. One of them // will be tried at random. -func BlockServiceConnection(log LogLevels, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (*net.TCPConn, error) { +func BlockServiceConnection(log *Logger, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (*net.TCPConn, error) { if port1 == 0 { panic(fmt.Errorf("ip1/port1 must be provided")) } @@ -98,7 +98,7 @@ func bsExpect[V uint8 | uint32 | uint64](what string, sock io.Reader, x V) error } func WriteBlock( - logger LogLevels, + logger *Logger, conn interface { io.ReaderFrom io.Reader @@ -142,7 +142,7 @@ func WriteBlock( // Won't actually fetch the block -- it'll be readable from `conn` as this function terminates. func FetchBlock( - logger LogLevels, + logger *Logger, conn interface { io.Reader io.Writer @@ -171,7 +171,7 @@ func FetchBlock( } func EraseBlock( - logger LogLevels, + logger *Logger, conn interface { io.Writer io.Reader @@ -200,7 +200,7 @@ func EraseBlock( // returns the write proof func CopyBlock( - logger LogLevels, + logger *Logger, sourceConn interface { io.Reader io.Writer diff --git a/go/eggs/cdcreq.go b/go/eggs/cdcreq.go index 3d815695..ca34fb79 100644 --- a/go/eggs/cdcreq.go +++ b/go/eggs/cdcreq.go @@ -70,7 +70,7 @@ const maxCDCSingleTimeout = 500 * time.Millisecond const cdcMaxElapsed = 25 * time.Second // TODO these are a bit ridicolous now because of the valgrind test, adjust func (c *Client) checkRepeatedCDCRequestError( - logger LogLevels, + logger *Logger, // these are already filled in by now req cdcRequest, resp msgs.CDCResponse, @@ -128,7 +128,7 @@ func (c *Client) checkRepeatedCDCRequestError( } func (c *Client) CDCRequest( - logger LogLevels, + logger *Logger, reqBody msgs.CDCRequest, // Result will be written in here. If an error is returned, no guarantees // are made regarding the contents of `respBody`. @@ -164,7 +164,8 @@ func (c *Client) CDCRequest( body: reqBody, } reqBytes := bincode.Pack(&req) - logger.Debug("about to send request id %v (%T, %+v) to CDC, after %v attempts", requestId, reqBody, reqBody, attempts) + logger.Debug("about to send request id %v (type %T) to CDC, after %v attempts", requestId, reqBody, attempts) + logger.Trace("reqBody %+v", reqBody) written, err := sock.Write(reqBytes) if err != nil { return fmt.Errorf("couldn't send request: %w", err) diff --git a/go/eggs/client.go b/go/eggs/client.go index 46618dfb..4c172d0c 100644 --- a/go/eggs/client.go +++ b/go/eggs/client.go @@ -56,7 +56,7 @@ type Client struct { // is that there won't be much contention otherwise you might as well create // a socket each time. TODO not sure this is the best way forward func NewClient( - log LogLevels, + log *Logger, shuckleAddress string, shid *msgs.ShardId, counters *ClientCounters, @@ -69,7 +69,7 @@ func NewClient( { resp, err := ShuckleRequest(log, shuckleAddress, &msgs.ShardsReq{}) if err != nil { - return nil, err + return nil, fmt.Errorf("could not request shards from shuckle: %w", err) } shards := resp.(*msgs.ShardsResp) for i, shard := range shards.Shards { @@ -81,7 +81,7 @@ func NewClient( } resp, err = ShuckleRequest(log, shuckleAddress, &msgs.CdcReq{}) if err != nil { - return nil, err + return nil, fmt.Errorf("could not request CDC from shuckle: %w", err) } cdc := resp.(*msgs.CdcResp) if cdc.Port == 0 { @@ -94,7 +94,7 @@ func NewClient( } func NewClientDirect( - log LogLevels, + log *Logger, shid *msgs.ShardId, counters *ClientCounters, cdcKey cipher.Block, @@ -263,7 +263,7 @@ func (c *shardSpecificFactory) releaseShardSocket(shid msgs.ShardId, sock *net.U } // nil if the directory has no directory info (i.e. if it is inherited) -func GetDirectoryInfo(log LogLevels, c *Client, id msgs.InodeId) (*msgs.DirectoryInfoBody, error) { +func GetDirectoryInfo(log *Logger, c *Client, id msgs.InodeId) (*msgs.DirectoryInfoBody, error) { req := msgs.StatDirectoryReq{ Id: id, } @@ -281,7 +281,7 @@ func GetDirectoryInfo(log LogLevels, c *Client, id msgs.InodeId) (*msgs.Director return &info, nil } -func SetDirectoryInfo(log LogLevels, c *Client, id msgs.InodeId, inherited bool, info *msgs.DirectoryInfoBody) error { +func SetDirectoryInfo(log *Logger, c *Client, id msgs.InodeId, inherited bool, info *msgs.DirectoryInfoBody) error { var buf []byte if inherited { if info != nil { @@ -306,7 +306,7 @@ func SetDirectoryInfo(log LogLevels, c *Client, id msgs.InodeId, inherited bool, } func ResolveDirectoryInfo( - log LogLevels, + log *Logger, client *Client, dirInfoCache *DirInfoCache, dirId msgs.InodeId, @@ -321,7 +321,7 @@ func ResolveDirectoryInfo( } func resolveDirectoryInfo( - log LogLevels, + log *Logger, client *Client, dirInfoCache *DirInfoCache, dirId msgs.InodeId, diff --git a/go/eggs/gc.go b/go/eggs/gc.go index 16f2121e..142fe077 100644 --- a/go/eggs/gc.go +++ b/go/eggs/gc.go @@ -13,7 +13,7 @@ type DestructionStats struct { } func DestructFile( - log LogLevels, + log *Logger, client *Client, mbs MockableBlockServices, stats *DestructionStats, @@ -83,7 +83,7 @@ func DestructFile( } func destructFilesInternal( - log LogLevels, + log *Logger, client *Client, shid msgs.ShardId, stats *DestructionStats, @@ -114,7 +114,7 @@ func destructFilesInternal( // Collects dead transient files, and expunges them. Stops when // all files have been traversed. Useful for testing a single iteration. func DestructFiles( - log LogLevels, shuckleAddress string, counters *ClientCounters, shid msgs.ShardId, blockService MockableBlockServices, + log *Logger, shuckleAddress string, counters *ClientCounters, shid msgs.ShardId, blockService MockableBlockServices, ) error { client, err := NewClient(log, shuckleAddress, &shid, counters, nil) if err != nil { @@ -130,7 +130,7 @@ func DestructFiles( } func DestructFilesInAllShards( - log LogLevels, + log *Logger, shuckleAddress string, counters *ClientCounters, blockService MockableBlockServices, @@ -160,7 +160,7 @@ type CollectStats struct { // returns whether all the edges were removed func applyPolicy( - log LogLevels, client *Client, stats *CollectStats, + log *Logger, client *Client, stats *CollectStats, dirId msgs.InodeId, dirInfo *msgs.DirectoryInfoBody, edges []msgs.Edge, ) (bool, error) { policy := SnapshotPolicy{ @@ -219,7 +219,7 @@ func applyPolicy( return toCollect == len(edges), nil } -func CollectDirectory(log LogLevels, client *Client, dirInfoCache *DirInfoCache, stats *CollectStats, dirId msgs.InodeId) error { +func CollectDirectory(log *Logger, client *Client, dirInfoCache *DirInfoCache, stats *CollectStats, dirId msgs.InodeId) error { log.Debug("%v: collecting", dirId) stats.VisitedDirectories++ @@ -296,7 +296,7 @@ func CollectDirectory(log LogLevels, client *Client, dirInfoCache *DirInfoCache, return nil } -func collectDirectoriesInternal(log LogLevels, client *Client, stats *CollectStats, shid msgs.ShardId) error { +func collectDirectoriesInternal(log *Logger, client *Client, stats *CollectStats, shid msgs.ShardId) error { dirInfoCache := NewDirInfoCache() req := msgs.VisitDirectoriesReq{} resp := msgs.VisitDirectoriesResp{} @@ -324,7 +324,7 @@ func collectDirectoriesInternal(log LogLevels, client *Client, stats *CollectSta return nil } -func CollectDirectories(log LogLevels, shuckleAddress string, counters *ClientCounters, shid msgs.ShardId) error { +func CollectDirectories(log *Logger, shuckleAddress string, counters *ClientCounters, shid msgs.ShardId) error { client, err := NewClient(log, shuckleAddress, &shid, counters, nil) if err != nil { return err @@ -338,7 +338,7 @@ func CollectDirectories(log LogLevels, shuckleAddress string, counters *ClientCo return nil } -func CollectDirectoriesInAllShards(log LogLevels, shuckleAddress string, counters *ClientCounters) error { +func CollectDirectoriesInAllShards(log *Logger, shuckleAddress string, counters *ClientCounters) error { client, err := NewClient(log, shuckleAddress, nil, counters, nil) if err != nil { return err diff --git a/go/eggs/loglevels.go b/go/eggs/loglevels.go index 86627fbf..e361cd93 100644 --- a/go/eggs/loglevels.go +++ b/go/eggs/loglevels.go @@ -6,93 +6,66 @@ import ( "log" ) -type LogLevels interface { - Info(format string, v ...any) - InfoStack(calldepth int, format string, v ...any) - Debug(format string, v ...any) - DebugStack(calldepth int, format string, v ...any) - RaiseAlert(err any) - RaiseAlertStack(calldepth int, err any) +type LogLevel uint8 + +const TRACE LogLevel = 0 +const DEBUG LogLevel = 1 +const INFO LogLevel = 2 +const ERROR LogLevel = 3 + +type Logger struct { + logger *log.Logger + level LogLevel } -func Log(log LogLevels, debug bool, format string, v ...any) { - if debug { - log.Debug(format, v...) +func NewLogger(verbose bool, out io.Writer) *Logger { + l := Logger{ + logger: log.New(out, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile), + } + if verbose { + l.level = DEBUG } else { - log.Info(format, v...) + l.level = INFO + } + return &l +} + +func (l *Logger) shouldLog(level LogLevel) bool { + return level >= l.level +} + +func (l *Logger) Log(level LogLevel, format string, v ...any) { + if l.shouldLog(level) { + l.logger.Output(2, fmt.Sprintf(format, v...)) } } -type LogToStdout struct { - Verbose bool -} - -func (*LogToStdout) Info(format string, v ...any) { - fmt.Printf(format, v...) - fmt.Println() -} - -func (*LogToStdout) InfoStack(calldepth int, format string, v ...any) { - fmt.Printf(format, v...) - fmt.Println() -} - -func (s *LogToStdout) Debug(format string, v ...any) { - if s.Verbose { - fmt.Printf(format, v...) - fmt.Println() +func (l *Logger) LogStack(calldepth int, level LogLevel, format string, v ...any) { + if l.shouldLog(level) { + l.logger.Output(2+calldepth, fmt.Sprintf(format, v...)) } } -func (s *LogToStdout) DebugStack(calldepth int, format string, v ...any) { - if s.Verbose { - fmt.Printf(format, v...) - fmt.Println() - } +func (l *Logger) Trace(format string, v ...any) { + l.LogStack(1, TRACE, format, v...) } -func (s *LogToStdout) RaiseAlert(err any) { - fmt.Printf("ALERT %v\n", err) +func (l *Logger) Debug(format string, v ...any) { + l.LogStack(1, DEBUG, format, v...) } -func (s *LogToStdout) RaiseAlertStack(calldepth int, err any) { - s.RaiseAlert(err) +func (l *Logger) Info(format string, v ...any) { + l.LogStack(1, INFO, format, v...) } -// Creates a logger with the formatting we want -func NewLogger(out io.Writer) *log.Logger { - return log.New(out, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile) +func (l *Logger) Error(format string, v ...any) { + l.LogStack(1, ERROR, format, v...) } -type LogLogger struct { - Verbose bool - Logger *log.Logger +func (l *Logger) RaiseAlert(err any) { + l.logger.Output(2, fmt.Sprintf("ALERT %v\n", err)) } -func (l *LogLogger) Info(format string, v ...any) { - l.Logger.Output(2, fmt.Sprintf(format+"\n", v...)) -} - -func (l *LogLogger) InfoStack(calldepth int, format string, v ...any) { - l.Logger.Output(2+calldepth, fmt.Sprintf(format+"\n", v...)) -} - -func (l *LogLogger) Debug(format string, v ...any) { - if l.Verbose { - l.Logger.Output(2, fmt.Sprintf(format+"\n", v...)) - } -} - -func (l *LogLogger) DebugStack(calldepth int, format string, v ...any) { - if l.Verbose { - l.Logger.Output(2+calldepth, fmt.Sprintf(format+"\n", v...)) - } -} - -func (l *LogLogger) RaiseAlert(err any) { - l.Logger.Output(2, fmt.Sprintf("ALERT %v\n", err)) -} - -func (l *LogLogger) RaiseAlertStack(calldepth int, err any) { - l.Logger.Output(2+calldepth, fmt.Sprintf("ALERT %v\n", err)) +func (l *Logger) RaiseAlertStack(calldepth int, err any) { + l.logger.Output(2+calldepth, fmt.Sprintf("ALERT %v\n", err)) } diff --git a/go/eggs/managedprocess.go b/go/eggs/managedprocess.go index 683f2a50..fccb3b6c 100644 --- a/go/eggs/managedprocess.go +++ b/go/eggs/managedprocess.go @@ -131,7 +131,7 @@ func closeOut(out io.Writer) { } } -func (procs *ManagedProcesses) Start(ll LogLevels, args *ManagedProcessArgs) *ManagedProcess { +func (procs *ManagedProcesses) Start(ll *Logger, args *ManagedProcessArgs) *ManagedProcess { exitedChan := make(chan struct{}, 1) procs.processes = append(procs.processes, ManagedProcess{ @@ -244,7 +244,7 @@ func (procs *ManagedProcesses) installSignalHandlers() { } func (procs *ManagedProcesses) StartPythonScript( - ll LogLevels, name string, script string, args []string, mpArgs *ManagedProcessArgs, + ll *Logger, name string, script string, args []string, mpArgs *ManagedProcessArgs, ) { mpArgs.Name = name mpArgs.Exe = "python" @@ -276,7 +276,7 @@ func createDataDir(dir string) { } } -func (procs *ManagedProcesses) StartBlockService(ll LogLevels, opts *BlockServiceOpts) { +func (procs *ManagedProcesses) StartBlockService(ll *Logger, opts *BlockServiceOpts) { createDataDir(opts.Path) args := []string{ "-failure-domain", opts.FailureDomain, @@ -318,7 +318,7 @@ type FuseOpts struct { Profile bool } -func (procs *ManagedProcesses) StartFuse(ll LogLevels, opts *FuseOpts) string { +func (procs *ManagedProcesses) StartFuse(ll *Logger, opts *FuseOpts) string { createDataDir(opts.Path) mountPoint := path.Join(opts.Path, "mnt") createDataDir(mountPoint) @@ -363,7 +363,7 @@ type ShuckleOpts struct { HttpPort uint16 } -func (procs *ManagedProcesses) StartShuckle(ll LogLevels, opts *ShuckleOpts) { +func (procs *ManagedProcesses) StartShuckle(ll *Logger, opts *ShuckleOpts) { createDataDir(opts.Dir) args := []string{ "-bincode-port", fmt.Sprintf("%d", opts.BincodePort), @@ -383,7 +383,7 @@ func (procs *ManagedProcesses) StartShuckle(ll LogLevels, opts *ShuckleOpts) { }) } -func BuildShuckleExe(ll LogLevels) string { +func BuildShuckleExe(ll *Logger) string { buildCmd := exec.Command("go", "build", ".") buildCmd.Dir = path.Join(goDir(), "eggsshuckle") ll.Info("building shuckle") @@ -395,7 +395,7 @@ func BuildShuckleExe(ll LogLevels) string { return path.Join(buildCmd.Dir, "eggsshuckle") } -func BuildBlockServiceExe(ll LogLevels) string { +func BuildBlockServiceExe(ll *Logger) string { buildCmd := exec.Command("go", "build", ".") buildCmd.Dir = path.Join(goDir(), "eggsblocks") ll.Info("building block service") @@ -407,7 +407,7 @@ func BuildBlockServiceExe(ll LogLevels) string { return path.Join(buildCmd.Dir, "eggsblocks") } -func BuildEggsFuseExe(ll LogLevels) string { +func BuildEggsFuseExe(ll *Logger) string { buildCmd := exec.Command("go", "build", ".") buildCmd.Dir = path.Join(goDir(), "eggsfuse") ll.Info("building eggsfuse") @@ -433,7 +433,7 @@ type ShardOpts struct { Port uint16 } -func (procs *ManagedProcesses) StartShard(ll LogLevels, opts *ShardOpts) { +func (procs *ManagedProcesses) StartShard(ll *Logger, opts *ShardOpts) { if opts.Valgrind && opts.Perf { panic(fmt.Errorf("cannot do valgrind and perf together")) } @@ -503,7 +503,7 @@ type CDCOpts struct { Port uint16 } -func (procs *ManagedProcesses) StartCDC(ll LogLevels, opts *CDCOpts) { +func (procs *ManagedProcesses) StartCDC(ll *Logger, opts *CDCOpts) { if opts.Valgrind && opts.Perf { panic(fmt.Errorf("cannot do valgrind and perf together")) } @@ -557,7 +557,7 @@ func (procs *ManagedProcesses) StartCDC(ll LogLevels, opts *CDCOpts) { procs.Start(ll, &mpArgs) } -func WaitForShard(log LogLevels, shuckleAddress string, shid msgs.ShardId, timeout time.Duration) { +func WaitForShard(log *Logger, shuckleAddress string, shid msgs.ShardId, timeout time.Duration) { t0 := time.Now() var err error var client *Client @@ -594,7 +594,7 @@ type BuildCppOpts struct { } // Returns build dir -func buildCpp(ll LogLevels, buildType string, targets []string) string { +func buildCpp(ll *Logger, buildType string, targets []string) string { cppDir := cppDir() buildArgs := append([]string{buildType}, targets...) buildCmd := exec.Command("./build.py", buildArgs...) @@ -613,7 +613,7 @@ type CppExes struct { CDCExe string } -func BuildCppExes(ll LogLevels, buildType string) *CppExes { +func BuildCppExes(ll *Logger, buildType string) *CppExes { buildDir := buildCpp(ll, buildType, []string{"shard/eggsshard", "cdc/eggscdc"}) return &CppExes{ ShardExe: path.Join(buildDir, "shard/eggsshard"), diff --git a/go/eggs/migrate.go b/go/eggs/migrate.go index b44162d8..0b9003ad 100644 --- a/go/eggs/migrate.go +++ b/go/eggs/migrate.go @@ -15,7 +15,7 @@ type scratchFile struct { size uint64 } -func ensureScratchFile(log LogLevels, client *Client, migratingIn msgs.InodeId, file *scratchFile) error { +func ensureScratchFile(log *Logger, client *Client, migratingIn msgs.InodeId, file *scratchFile) error { if file.id != msgs.NULL_INODE_ID { return nil } @@ -39,7 +39,7 @@ func ensureScratchFile(log LogLevels, client *Client, migratingIn msgs.InodeId, } func copyBlock( - log LogLevels, + log *Logger, client *Client, mbs MockableBlockServices, file *scratchFile, @@ -106,7 +106,7 @@ type keepScratchFileAlive struct { } func startToKeepScratchFileAlive( - log LogLevels, + log *Logger, client *Client, scratchFile *scratchFile, ) keepScratchFileAlive { @@ -165,7 +165,7 @@ func (k *keepScratchFileAlive) stop() { } func migrateBlocksInFileInternal( - log LogLevels, + log *Logger, client *Client, mbs MockableBlockServices, stats *MigrateStats, @@ -252,7 +252,7 @@ type MigrateStats struct { // If the source block service it's still healthy, it'll just copy the block over, otherwise // it'll be recovered from the other. If possible, anyway. func MigrateBlocksInFile( - log LogLevels, + log *Logger, client *Client, mbs MockableBlockServices, stats *MigrateStats, @@ -268,7 +268,7 @@ func MigrateBlocksInFile( // Tries to migrate as many blocks as possible from that block service in a certain // shard. func migrateBlocksInternal( - log LogLevels, + log *Logger, client *Client, mbs MockableBlockServices, stats *MigrateStats, @@ -302,7 +302,7 @@ func migrateBlocksInternal( } func MigrateBlocks( - log LogLevels, + log *Logger, client *Client, mbs MockableBlockServices, stats *MigrateStats, @@ -317,7 +317,7 @@ func MigrateBlocks( } func MigrateBlocksInAllShards( - log LogLevels, + log *Logger, client *Client, mbs MockableBlockServices, stats *MigrateStats, diff --git a/go/eggs/mockableblockservice.go b/go/eggs/mockableblockservice.go index e88f2943..fb6226aa 100644 --- a/go/eggs/mockableblockservice.go +++ b/go/eggs/mockableblockservice.go @@ -15,9 +15,9 @@ type MockableBlockServiceConn interface { } type MockableBlockServices interface { - BlockServiceConnection(log LogLevels, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (MockableBlockServiceConn, error) + BlockServiceConnection(log *Logger, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (MockableBlockServiceConn, error) WriteBlock( - logger LogLevels, + logger *Logger, conn MockableBlockServiceConn, block *msgs.BlockInfo, r io.Reader, @@ -25,7 +25,7 @@ type MockableBlockServices interface { crc [4]byte, ) ([8]byte, error) FetchBlock( - logger LogLevels, + logger *Logger, conn MockableBlockServiceConn, blockService *msgs.BlockService, blockId msgs.BlockId, @@ -34,12 +34,12 @@ type MockableBlockServices interface { count uint32, ) error EraseBlock( - logger LogLevels, + logger *Logger, conn MockableBlockServiceConn, block msgs.BlockInfo, ) ([8]byte, error) CopyBlock( - logger LogLevels, + logger *Logger, sourceConn MockableBlockServiceConn, sourceBlockService *msgs.BlockService, sourceBlockId msgs.BlockId, @@ -52,12 +52,12 @@ type MockableBlockServices interface { type RealBlockServices struct{} -func (RealBlockServices) BlockServiceConnection(log LogLevels, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (MockableBlockServiceConn, error) { +func (RealBlockServices) BlockServiceConnection(log *Logger, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (MockableBlockServiceConn, error) { return BlockServiceConnection(log, id, ip1, port1, ip2, port2) } func (RealBlockServices) WriteBlock( - logger LogLevels, + logger *Logger, conn MockableBlockServiceConn, block *msgs.BlockInfo, r io.Reader, @@ -68,7 +68,7 @@ func (RealBlockServices) WriteBlock( } func (RealBlockServices) FetchBlock( - logger LogLevels, + logger *Logger, conn MockableBlockServiceConn, blockService *msgs.BlockService, blockId msgs.BlockId, @@ -80,7 +80,7 @@ func (RealBlockServices) FetchBlock( } func (RealBlockServices) EraseBlock( - logger LogLevels, + logger *Logger, conn MockableBlockServiceConn, block msgs.BlockInfo, ) ([8]byte, error) { @@ -88,7 +88,7 @@ func (RealBlockServices) EraseBlock( } func (RealBlockServices) CopyBlock( - logger LogLevels, + logger *Logger, sourceConn MockableBlockServiceConn, sourceBlockService *msgs.BlockService, sourceBlockId msgs.BlockId, @@ -124,12 +124,12 @@ func (dummyConn) ReadFrom(r io.Reader) (n int64, err error) { return 0, nil } -func (mbs *MockedBlockServices) BlockServiceConnection(log LogLevels, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (MockableBlockServiceConn, error) { +func (mbs *MockedBlockServices) BlockServiceConnection(log *Logger, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (MockableBlockServiceConn, error) { return dummyConn{}, nil } func (mbs *MockedBlockServices) WriteBlock( - logger LogLevels, + logger *Logger, conn MockableBlockServiceConn, block *msgs.BlockInfo, r io.Reader, @@ -157,7 +157,7 @@ func (mbs *MockedBlockServices) WriteBlock( } func (mbs *MockedBlockServices) FetchBlock( - logger LogLevels, + logger *Logger, conn MockableBlockServiceConn, blockService *msgs.BlockService, blockId msgs.BlockId, @@ -169,7 +169,7 @@ func (mbs *MockedBlockServices) FetchBlock( } func (mbs *MockedBlockServices) EraseBlock( - logger LogLevels, + logger *Logger, conn MockableBlockServiceConn, block msgs.BlockInfo, ) ([8]byte, error) { @@ -181,7 +181,7 @@ func (mbs *MockedBlockServices) EraseBlock( } func (mbs *MockedBlockServices) CopyBlock( - logger LogLevels, + logger *Logger, sourceConn MockableBlockServiceConn, sourceBlockService *msgs.BlockService, sourceBlockId msgs.BlockId, diff --git a/go/eggs/shardreq.go b/go/eggs/shardreq.go index 9380eb11..8d8fca4e 100644 --- a/go/eggs/shardreq.go +++ b/go/eggs/shardreq.go @@ -85,7 +85,7 @@ func newRequestId() uint64 { } func (c *Client) checkDeletedEdge( - logger LogLevels, + logger *Logger, dirId msgs.InodeId, targetId msgs.InodeId, name string, @@ -117,7 +117,7 @@ func (c *Client) checkDeletedEdge( } func (c *Client) checkNewEdgeAfterRename( - logger LogLevels, + logger *Logger, dirId msgs.InodeId, targetId msgs.InodeId, name string, @@ -139,7 +139,7 @@ func (c *Client) checkNewEdgeAfterRename( } func (c *Client) checkRepeatedShardRequestError( - logger LogLevels, + logger *Logger, // these are already filled in by now req shardRequest, resp msgs.ShardResponse, @@ -179,7 +179,7 @@ func (c *Client) checkRepeatedShardRequestError( } func (c *Client) ShardRequest( - logger LogLevels, + logger *Logger, shid msgs.ShardId, reqBody msgs.ShardRequest, // Result will be written in here. If an error is returned, no guarantees @@ -216,7 +216,8 @@ func (c *Client) ShardRequest( body: reqBody, } reqBytes := packShardRequest(&req, c.cdcKey) - logger.Debug("about to send request id %v (%T, %+v) to shard %v, after %v attempts", requestId, reqBody, reqBody, shid, attempts) + logger.Debug("about to send request id %v (type %T) to shard %v using conn %v->%v, after %v attempts", requestId, reqBody, shid, sock.RemoteAddr(), sock.LocalAddr(), attempts) + logger.Trace("reqBody %+v", reqBody) written, err := sock.Write(reqBytes) if err != nil { return fmt.Errorf("couldn't send request to shard %v: %w", shid, err) diff --git a/go/eggs/shucklereq.go b/go/eggs/shucklereq.go index fae56208..8c3cc97e 100644 --- a/go/eggs/shucklereq.go +++ b/go/eggs/shucklereq.go @@ -12,7 +12,7 @@ import ( const DEFAULT_SHUCKLE_ADDRESS = "REDACTED" func ReadShuckleRequest( - log LogLevels, + log *Logger, r io.Reader, ) (msgs.ShuckleRequest, error) { var protocol uint32 @@ -56,7 +56,7 @@ func ReadShuckleRequest( return req, nil } -func WriteShuckleRequest(log LogLevels, w io.Writer, req msgs.ShuckleRequest) error { +func WriteShuckleRequest(log *Logger, w io.Writer, req msgs.ShuckleRequest) error { // serialize bytes := bincode.Pack(req) // write out @@ -76,7 +76,7 @@ func WriteShuckleRequest(log LogLevels, w io.Writer, req msgs.ShuckleRequest) er } func ReadShuckleResponse( - log LogLevels, + log *Logger, r io.Reader, ) (msgs.ShuckleResponse, error) { var protocol uint32 @@ -120,7 +120,7 @@ func ReadShuckleResponse( return resp, nil } -func WriteShuckleResponse(log LogLevels, w io.Writer, resp msgs.ShuckleResponse) error { +func WriteShuckleResponse(log *Logger, w io.Writer, resp msgs.ShuckleResponse) error { // serialize bytes := bincode.Pack(resp) // write out @@ -140,7 +140,7 @@ func WriteShuckleResponse(log LogLevels, w io.Writer, resp msgs.ShuckleResponse) } func ShuckleRequest( - log LogLevels, + log *Logger, shuckleAddress string, req msgs.ShuckleRequest, ) (msgs.ShuckleResponse, error) { diff --git a/go/eggs/waitshuckle.go b/go/eggs/waitshuckle.go index f97f2384..ac8f2c03 100644 --- a/go/eggs/waitshuckle.go +++ b/go/eggs/waitshuckle.go @@ -13,7 +13,7 @@ type ShuckleInfo struct { BlockServices []msgs.BlockServiceInfo } -func WaitForShuckle(ll LogLevels, shuckleAddress string, expectedBlockServices int, timeout time.Duration) *ShuckleInfo { +func WaitForShuckle(ll *Logger, shuckleAddress string, expectedBlockServices int, timeout time.Duration) *ShuckleInfo { info := ShuckleInfo{} t0 := time.Now() var err error diff --git a/go/eggsblocks/eggsblocks.go b/go/eggsblocks/eggsblocks.go index 236041e9..dac66efd 100644 --- a/go/eggsblocks/eggsblocks.go +++ b/go/eggsblocks/eggsblocks.go @@ -28,7 +28,7 @@ import ( var stacktraceLock sync.Mutex -func handleRecover(log eggs.LogLevels, terminateChan chan any, err any) { +func handleRecover(log *eggs.Logger, terminateChan chan any, err any) { if err != nil { log.RaiseAlert(err.(error)) stacktraceLock.Lock() @@ -80,7 +80,7 @@ func countBlocks(basePath string) uint64 { } func registerPeriodically( - log eggs.LogLevels, + log *eggs.Logger, ip1 [4]byte, port1 uint16, ip2 [4]byte, @@ -162,7 +162,7 @@ func blockIdToPath(basePath string, blockId msgs.BlockId) string { return path.Join(path.Join(basePath, dir), hex) } -func eraseBlock(log eggs.LogLevels, basePath string, blockId msgs.BlockId) { +func eraseBlock(log *eggs.Logger, basePath string, blockId msgs.BlockId) { blockPath := blockIdToPath(basePath, blockId) log.Debug("deleting block %v at path %v", blockId, blockPath) if err := os.Remove(blockPath); err != nil { @@ -199,7 +199,7 @@ func deserFetchBlock(r io.Reader) (blockId msgs.BlockId, offset uint32, count ui return blockId, offset, count, nil } -func sendFetchBlock(log eggs.LogLevels, blockServiceId msgs.BlockServiceId, basePath string, blockId msgs.BlockId, offset uint32, count uint32, conn *net.TCPConn) error { +func sendFetchBlock(log *eggs.Logger, blockServiceId msgs.BlockServiceId, basePath string, blockId msgs.BlockId, offset uint32, count uint32, conn *net.TCPConn) error { blockPath := blockIdToPath(basePath, blockId) log.Debug("fetching block id %v at path %v", blockId, blockPath) f, err := os.Open(blockPath) @@ -269,7 +269,7 @@ func deserWriteBlock(cipher cipher.Block, blockServiceId msgs.BlockServiceId, ki return blockId, crc, size, nil } -func writeBlock(log eggs.LogLevels, basePath string, blockId msgs.BlockId, expectedCrc [4]byte, size uint32, conn *net.TCPConn) error { +func writeBlock(log *eggs.Logger, basePath string, blockId msgs.BlockId, expectedCrc [4]byte, size uint32, conn *net.TCPConn) error { filePath := blockIdToPath(basePath, blockId) log.Debug("writing block %v at path %v", blockId, basePath) if err := os.Mkdir(path.Dir(filePath), 0777); err != nil && !os.IsExist(err) { @@ -322,7 +322,7 @@ func serWriteCert(blockServiceId msgs.BlockServiceId, key cipher.Block, blockId return nil } -func handleReqError(log eggs.LogLevels, err error) { +func handleReqError(log *eggs.Logger, err error) { log.RaiseAlertStack(1, fmt.Errorf("error while handling request: %w", err)) } @@ -333,7 +333,7 @@ const FUTURE_CUTOFF uint64 = ONE_HOUR_IN_NS * 2 const MAX_OBJECT_SIZE uint32 = 100e6 func handleRequest( - log eggs.LogLevels, + log *eggs.Logger, terminateChan chan any, blockServices map[msgs.BlockServiceId]blockService, conn *net.TCPConn, @@ -437,7 +437,7 @@ Options:` flag.PrintDefaults() } -func retrieveOrCreateKey(log eggs.LogLevels, dir string) [16]byte { +func retrieveOrCreateKey(log *eggs.Logger, dir string) [16]byte { var err error var keyFile *os.File keyFilePath := path.Join(dir, "secret.key") @@ -575,10 +575,7 @@ func main() { } defer logOut.Close() } - log := &eggs.LogLogger{ - Verbose: *verbose, - Logger: eggs.NewLogger(logOut), - } + log := eggs.NewLogger(*verbose, logOut) blockServices := make(map[msgs.BlockServiceId]blockService) for i := 0; i < flag.NArg(); i += 2 { diff --git a/go/eggscli/eggscli.go b/go/eggscli/eggscli.go index a2a61781..65af8e35 100644 --- a/go/eggscli/eggscli.go +++ b/go/eggscli/eggscli.go @@ -41,7 +41,7 @@ func main() { shuckleAddress := flag.String("shuckle", eggs.DEFAULT_SHUCKLE_ADDRESS, "Shuckle address (host:port).") verbose := flag.Bool("verbose", false, "") - var log eggs.LogLevels + var log *eggs.Logger commands = make(map[string]commandSpec) @@ -148,6 +148,36 @@ func main() { run: migrateRun, } + shardReqCmd := flag.NewFlagSet("shard-req", flag.ExitOnError) + shardReqShard := shardReqCmd.Uint("shard", 0, "Shard to send the req too") + shardReqKind := shardReqCmd.String("kind", "", "") + shardReqReq := shardReqCmd.String("req", "", "Request body, in JSON") + shardReqRun := func() { + req, resp := msgs.MkShardMessage(*shardReqKind) + if err := json.Unmarshal([]byte(*shardReqReq), &req); err != nil { + panic(fmt.Errorf("could not decode shard req: %w", err)) + } + shard := msgs.ShardId(*shardReqShard) + client, err := eggs.NewClient(log, *shuckleAddress, &shard, nil, nil) + if err != nil { + panic(err) + } + defer client.Close() + if err := client.ShardRequest(log, shard, req, resp); err != nil { + panic(err) + } + out, err := json.MarshalIndent(resp, "", " ") + if err != nil { + panic(fmt.Errorf("could not encode response %+v to json: %w", resp, err)) + } + os.Stdout.Write(out) + fmt.Println() + } + commands["shard-req"] = commandSpec{ + flags: shardReqCmd, + run: shardReqRun, + } + setPolicyCmd := flag.NewFlagSet("set-policy", flag.ExitOnError) setPolicyIdU64 := setPolicyCmd.Uint64("id", 0, "InodeId for the directory to set the policy of.") setPolicyPolicy := setPolicyCmd.String("policy", "", "Policy, in JSON") @@ -224,10 +254,7 @@ func main() { os.Exit(2) } - log = &eggs.LogLogger{ - Logger: eggs.NewLogger(os.Stdout), - Verbose: *verbose, - } + log = eggs.NewLogger(*verbose, os.Stdout) spec, found := commands[flag.Args()[0]] if !found { diff --git a/go/eggsfuse/eggsfuse.go b/go/eggsfuse/eggsfuse.go index 51f81872..43dd8e22 100644 --- a/go/eggsfuse/eggsfuse.go +++ b/go/eggsfuse/eggsfuse.go @@ -20,7 +20,7 @@ import ( ) var client *eggs.Client -var log eggs.LogLevels +var log *eggs.Logger var dirInfoCache *eggs.DirInfoCache func eggsErrToErrno(err error) syscall.Errno { @@ -826,11 +826,7 @@ func main() { } defer logOut.Close() } - logger := eggs.NewLogger(logOut) - log = &eggs.LogLogger{ - Verbose: *verbose, - Logger: logger, - } + log = eggs.NewLogger(*verbose, logOut) if *profileFile != "" { f, err := os.Create(*profileFile) @@ -852,7 +848,7 @@ func main() { root := eggsNode{ id: msgs.ROOT_DIR_INODE_ID, } - server, err := fs.Mount(mountPoint, &root, &fs.Options{Logger: logger}) + server, err := fs.Mount(mountPoint, &root, &fs.Options{}) if err != nil { fmt.Fprintf(os.Stderr, "Could not mount: %v", err) os.Exit(1) diff --git a/go/eggsrun/eggsrun.go b/go/eggsrun/eggsrun.go index c619aaaf..a834e7cd 100644 --- a/go/eggsrun/eggsrun.go +++ b/go/eggsrun/eggsrun.go @@ -75,10 +75,7 @@ func main() { } defer logOut.Close() } - log := &eggs.LogLogger{ - Verbose: *verbose, - Logger: eggs.NewLogger(logOut), - } + log := eggs.NewLogger(*verbose, logOut) fmt.Printf("building shard/cdc/blockservice/shuckle\n") diff --git a/go/eggsshuckle/eggsshuckle.go b/go/eggsshuckle/eggsshuckle.go index 8804c6cd..4ff3ac3f 100644 --- a/go/eggsshuckle/eggsshuckle.go +++ b/go/eggsshuckle/eggsshuckle.go @@ -51,6 +51,7 @@ type state struct { shards [256]msgs.ShardInfo cdcIp [4]byte cdcPort uint16 + cdcLastSeen msgs.EggsTime } func newState() *state { @@ -60,7 +61,7 @@ func newState() *state { } } -func handleBlockServicesForShard(ll eggs.LogLevels, s *state, w io.Writer, req *msgs.BlockServicesForShardReq) *msgs.BlockServicesForShardResp { +func handleBlockServicesForShard(ll *eggs.Logger, s *state, w io.Writer, req *msgs.BlockServicesForShardReq) *msgs.BlockServicesForShardResp { s.mutex.RLock() defer s.mutex.RUnlock() @@ -77,7 +78,7 @@ func handleBlockServicesForShard(ll eggs.LogLevels, s *state, w io.Writer, req * return &resp } -func handleAllBlockServicesReq(ll eggs.LogLevels, s *state, w io.Writer, req *msgs.AllBlockServicesReq) *msgs.AllBlockServicesResp { +func handleAllBlockServicesReq(ll *eggs.Logger, s *state, w io.Writer, req *msgs.AllBlockServicesReq) *msgs.AllBlockServicesResp { s.mutex.RLock() defer s.mutex.RUnlock() @@ -93,18 +94,20 @@ func handleAllBlockServicesReq(ll eggs.LogLevels, s *state, w io.Writer, req *ms return &resp } -func handleRegisterBlockServices(ll eggs.LogLevels, s *state, w io.Writer, req *msgs.RegisterBlockServicesReq) *msgs.RegisterBlockServicesResp { +func handleRegisterBlockServices(ll *eggs.Logger, s *state, w io.Writer, req *msgs.RegisterBlockServicesReq) *msgs.RegisterBlockServicesResp { s.mutex.Lock() defer s.mutex.Unlock() + now := msgs.Now() for _, bs := range req.BlockServices { + bs.LastSeen = now s.blockServices[bs.Id] = bs } return &msgs.RegisterBlockServicesResp{} } -func handleShards(ll eggs.LogLevels, s *state, w io.Writer, req *msgs.ShardsReq) *msgs.ShardsResp { +func handleShards(ll *eggs.Logger, s *state, w io.Writer, req *msgs.ShardsReq) *msgs.ShardsResp { s.mutex.RLock() defer s.mutex.RUnlock() @@ -114,16 +117,17 @@ func handleShards(ll eggs.LogLevels, s *state, w io.Writer, req *msgs.ShardsReq) return &resp } -func handleRegisterShard(ll eggs.LogLevels, s *state, w io.Writer, req *msgs.RegisterShardReq) *msgs.RegisterShardResp { +func handleRegisterShard(ll *eggs.Logger, s *state, w io.Writer, req *msgs.RegisterShardReq) *msgs.RegisterShardResp { s.mutex.Lock() defer s.mutex.Unlock() s.shards[req.Id] = req.Info + s.shards[req.Id].LastSeen = msgs.Now() return &msgs.RegisterShardResp{} } -func handleCdcReq(log eggs.LogLevels, s *state, w io.Writer, req *msgs.CdcReq) *msgs.CdcResp { +func handleCdcReq(log *eggs.Logger, s *state, w io.Writer, req *msgs.CdcReq) *msgs.CdcResp { s.mutex.RLock() defer s.mutex.RUnlock() @@ -134,17 +138,18 @@ func handleCdcReq(log eggs.LogLevels, s *state, w io.Writer, req *msgs.CdcReq) * return &resp } -func handleRegisterCdcReq(log eggs.LogLevels, s *state, w io.Writer, req *msgs.RegisterCdcReq) *msgs.RegisterCdcResp { +func handleRegisterCdcReq(log *eggs.Logger, s *state, w io.Writer, req *msgs.RegisterCdcReq) *msgs.RegisterCdcResp { s.mutex.Lock() defer s.mutex.Unlock() s.cdcIp = req.Ip s.cdcPort = req.Port + s.cdcLastSeen = msgs.Now() return &msgs.RegisterCdcResp{} } -func handleRequest(log eggs.LogLevels, s *state, conn *net.TCPConn) { +func handleRequest(log *eggs.Logger, s *state, conn *net.TCPConn) { conn.SetLinger(0) // poor man error handling for now defer conn.Close() @@ -222,10 +227,10 @@ func sendPage( } func handleWithRecover( - log eggs.LogLevels, + log *eggs.Logger, w http.ResponseWriter, r *http.Request, - handle func(log eggs.LogLevels, query url.Values) (io.ReadCloser, int64, int), + handle func(log *eggs.Logger, query url.Values) (io.ReadCloser, int64, int), ) { statusPtr := new(int) var content io.ReadCloser @@ -266,7 +271,7 @@ func handleWithRecover( } func handlePage( - log eggs.LogLevels, + log *eggs.Logger, w http.ResponseWriter, r *http.Request, // third result is status code @@ -274,7 +279,7 @@ func handlePage( ) { handleWithRecover( log, w, r, - func(log eggs.LogLevels, query url.Values) (io.ReadCloser, int64, int) { + func(log *eggs.Logger, query url.Values) (io.ReadCloser, int64, int) { return sendPage(page(query)) }, ) @@ -299,6 +304,12 @@ type indexBlockService struct { AvailableBytes string Path string Blocks uint64 + LastSeen string +} + +type indexShard struct { + Addr string + LastSeen string } type indexData struct { @@ -308,8 +319,9 @@ type indexData struct { TotalUsed string TotalUsedPercentage string CDCAddr string + CDCLastSeen string BlockServices []indexBlockService - ShardsAddrs []string + ShardsAddrs []indexShard Blocks uint64 } @@ -354,7 +366,32 @@ func formatPreciseSize(bytes uint64) string { var indexTemplate *template.Template -func handleIndex(ll eggs.LogLevels, state *state, w http.ResponseWriter, r *http.Request) { +func formatNanos(nanos uint64) string { + var amount float64 + var unit string + if nanos < 1e3 { + amount = float64(nanos) + unit = "ns" + } else if nanos < 1e6 { + amount = float64(nanos) / 1e3 + unit = "µs" + } else if nanos < 1e9 { + amount = float64(nanos) / 1e6 + unit = "ms" + } else if nanos < 1e12 { + amount = float64(nanos) / 1e9 + unit = "s " + } else if nanos < 1e12*60 { + amount = float64(nanos) / (1e9 * 60.0) + unit = "m" + } else { + amount = float64(nanos) / (1e9 * 60.0 * 60.0) + unit = "h" + } + return fmt.Sprintf("%7.2f%s", amount, unit) +} + +func handleIndex(ll *eggs.Logger, state *state, w http.ResponseWriter, r *http.Request) { handlePage( ll, w, r, func(_ url.Values) (*template.Template, *pageData, int) { @@ -368,7 +405,12 @@ func handleIndex(ll eggs.LogLevels, state *state, w http.ResponseWriter, r *http data := indexData{ NumBlockServices: len(state.blockServices), } + now := msgs.Now() + formatLastSeen := func(t msgs.EggsTime) string { + return formatNanos(uint64(now) - uint64(t)) + } data.CDCAddr = fmt.Sprintf("%v:%v", net.IP(state.cdcIp[:]), state.cdcPort) + data.CDCLastSeen = formatLastSeen(state.cdcLastSeen) totalCapacityBytes := uint64(0) totalAvailableBytes := uint64(0) failureDomainsBytes := make(map[string]struct{}) @@ -383,6 +425,7 @@ func handleIndex(ll eggs.LogLevels, state *state, w http.ResponseWriter, r *http AvailableBytes: formatSize(bs.AvailableBytes), Path: bs.Path, Blocks: bs.Blocks, + LastSeen: formatLastSeen(bs.LastSeen), }) failureDomainsBytes[string(bs.FailureDomain[:])] = struct{}{} totalAvailableBytes += bs.AvailableBytes @@ -407,7 +450,10 @@ func handleIndex(ll eggs.LogLevels, state *state, w http.ResponseWriter, r *http }, ) for _, shard := range state.shards { - data.ShardsAddrs = append(data.ShardsAddrs, fmt.Sprintf("%v:%v", net.IP(shard.Ip[:]), shard.Port)) + data.ShardsAddrs = append(data.ShardsAddrs, indexShard{ + Addr: fmt.Sprintf("%v:%v", net.IP(shard.Ip[:]), shard.Port), + LastSeen: formatLastSeen(shard.LastSeen), + }) } data.TotalUsed = formatSize(totalCapacityBytes - totalAvailableBytes) data.TotalCapacity = formatSize(totalAvailableBytes) @@ -496,7 +542,7 @@ type directoryData struct { Info directoryInfo } -func newClient(log eggs.LogLevels, state *state) *eggs.Client { +func newClient(log *eggs.Logger, state *state) *eggs.Client { state.mutex.RLock() defer state.mutex.RUnlock() @@ -527,7 +573,7 @@ func normalizePath(path string) string { return newPath } -func lookup(log eggs.LogLevels, client *eggs.Client, path string) *msgs.InodeId { +func lookup(log *eggs.Logger, client *eggs.Client, path string) *msgs.InodeId { id := msgs.ROOT_DIR_INODE_ID if path == "" { return &id @@ -571,7 +617,7 @@ func pathSegments(path string) []pathSegment { return pathSegments } -func lookupDirectoryInfo(log eggs.LogLevels, client *eggs.Client, id msgs.InodeId) (msgs.InodeId, *msgs.DirectoryInfoBody) { +func lookupDirectoryInfo(log *eggs.Logger, client *eggs.Client, id msgs.InodeId) (msgs.InodeId, *msgs.DirectoryInfoBody) { req := msgs.StatDirectoryReq{Id: id} resp := msgs.StatDirectoryResp{} for { @@ -595,7 +641,7 @@ var fileTemplate *template.Template var directoryTemplate *template.Template func handleInode( - log eggs.LogLevels, + log *eggs.Logger, state *state, w http.ResponseWriter, r *http.Request, @@ -614,9 +660,15 @@ func handleInode( } id = msgs.InodeId(i) } - if id != msgs.NULL_INODE_ID && path != "" { + if id != msgs.NULL_INODE_ID && id != msgs.ROOT_DIR_INODE_ID && path == "/" { + path = "" // path not provided + } + if id != msgs.NULL_INODE_ID && id != msgs.ROOT_DIR_INODE_ID && path != "" { return errorPage(http.StatusBadRequest, "cannot specify both id and path") } + if id == msgs.ROOT_DIR_INODE_ID && path != "/" { + return errorPage(http.StatusBadRequest, "bat root inode id") + } client := newClient(log, state) if id == msgs.NULL_INODE_ID { mbId := lookup(log, client, path) @@ -776,10 +828,10 @@ func handleInode( ) } -func handleBlock(log eggs.LogLevels, st *state, w http.ResponseWriter, r *http.Request) { +func handleBlock(log *eggs.Logger, st *state, w http.ResponseWriter, r *http.Request) { handleWithRecover( log, w, r, - func(log eggs.LogLevels, query url.Values) (io.ReadCloser, int64, int) { + func(log *eggs.Logger, query url.Values) (io.ReadCloser, int64, int) { segments := strings.Split(r.URL.Path, "/")[1:] if segments[0] != "blocks" { panic(fmt.Errorf("bad path %v", r.URL.Path)) @@ -838,13 +890,13 @@ func handleBlock(log eggs.LogLevels, st *state, w http.ResponseWriter, r *http.R ) } -func setupRouting(log eggs.LogLevels, st *state) { +func setupRouting(log *eggs.Logger, st *state) { errorTemplate = parseTemplates( namedTemplate{name: "base", body: baseTemplateStr}, namedTemplate{name: "error", body: errorTemplateStr}, ) - setupPage := func(path string, handle func(ll eggs.LogLevels, state *state, w http.ResponseWriter, r *http.Request)) { + setupPage := func(path string, handle func(ll *eggs.Logger, state *state, w http.ResponseWriter, r *http.Request)) { http.HandleFunc( path, func(w http.ResponseWriter, r *http.Request) { handle(log, st, w, r) }, @@ -908,10 +960,7 @@ func main() { } } - ll := &eggs.LogLogger{ - Verbose: *verbose, - Logger: eggs.NewLogger(logOut), - } + ll := eggs.NewLogger(*verbose, logOut) bincodeListener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%v", *bincodePort)) if err != nil { diff --git a/go/eggsshuckle/index.html b/go/eggsshuckle/index.html index 0ac89e55..47eb56d5 100644 --- a/go/eggsshuckle/index.html +++ b/go/eggsshuckle/index.html @@ -11,10 +11,12 @@ Address + LastSeen {{.CDCAddr}} + {{.CDCLastSeen}} @@ -24,12 +26,14 @@ Id Address + LastSeen - {{ range $id, $addr := .ShardsAddrs }} + {{ range $id, $shard := .ShardsAddrs }} {{printf "%03v" $id}} - {{$addr}} + {{$shard.Addr}} + {{$shard.LastSeen}} {{end}} @@ -47,6 +51,7 @@ Blocks Capacity Available + LastSeen {{ range .BlockServices }} @@ -60,6 +65,7 @@ {{.Blocks}} {{.CapacityBytes}} {{.AvailableBytes}} + {{.LastSeen}} {{end}} diff --git a/go/gcdaemon/gcdaemon.go b/go/gcdaemon/gcdaemon.go index f654dda9..e7945cfb 100644 --- a/go/gcdaemon/gcdaemon.go +++ b/go/gcdaemon/gcdaemon.go @@ -54,10 +54,7 @@ func main() { } defer logOut.Close() } - log := &eggs.LogLogger{ - Verbose: *verbose, - Logger: eggs.NewLogger(logOut), - } + log := eggs.NewLogger(*verbose, logOut) panicChan := make(chan error) finishedChan := make(chan struct{}) diff --git a/go/integrationtest/cleanup.go b/go/integrationtest/cleanup.go index 8f1e8eeb..7a98a9fb 100644 --- a/go/integrationtest/cleanup.go +++ b/go/integrationtest/cleanup.go @@ -6,7 +6,7 @@ import ( "xtx/eggsfs/msgs" ) -func deleteDir(log eggs.LogLevels, client *eggs.Client, ownerId msgs.InodeId, name string, creationTime msgs.EggsTime, dirId msgs.InodeId) { +func deleteDir(log *eggs.Logger, client *eggs.Client, ownerId msgs.InodeId, name string, creationTime msgs.EggsTime, dirId msgs.InodeId) { readDirReq := msgs.ReadDirReq{ DirId: dirId, } @@ -43,7 +43,7 @@ func deleteDir(log eggs.LogLevels, client *eggs.Client, ownerId msgs.InodeId, na } func cleanupAfterTest( - log eggs.LogLevels, + log *eggs.Logger, shuckleAddress string, counters *eggs.ClientCounters, mbs eggs.MockableBlockServices, diff --git a/go/integrationtest/filehistory.go b/go/integrationtest/filehistory.go index 6d301f82..14ec6563 100644 --- a/go/integrationtest/filehistory.go +++ b/go/integrationtest/filehistory.go @@ -161,7 +161,7 @@ func checkCheckpoint(prefix string, files *fileHistoryFiles, allEdges []edge) { } } -func runCheckpoint(log eggs.LogLevels, client *eggs.Client, prefix string, files *fileHistoryFiles) fileHistoryCheckpoint { +func runCheckpoint(log *eggs.Logger, client *eggs.Client, prefix string, files *fileHistoryFiles) fileHistoryCheckpoint { edges := readDir(log, client, msgs.ROOT_DIR_INODE_ID) checkCheckpoint(prefix, files, edges) resp := msgs.StatDirectoryResp{} @@ -171,7 +171,7 @@ func runCheckpoint(log eggs.LogLevels, client *eggs.Client, prefix string, files } } -func runStep(log eggs.LogLevels, client *eggs.Client, mbs eggs.MockableBlockServices, files *fileHistoryFiles, stepAny any) any { +func runStep(log *eggs.Logger, client *eggs.Client, mbs eggs.MockableBlockServices, files *fileHistoryFiles, stepAny any) any { switch step := stepAny.(type) { case fileHistoryCreateFile: // 10 MiB spans @@ -268,7 +268,7 @@ func replayStep(prefix string, files *fileHistoryFiles, fullEdges []fullEdge, st } } -func fileHistoryStepSingle(log eggs.LogLevels, client *eggs.Client, mbs *eggs.MockedBlockServices, opts *fileHistoryTestOpts, seed int64, filePrefix string) { +func fileHistoryStepSingle(log *eggs.Logger, client *eggs.Client, mbs *eggs.MockedBlockServices, opts *fileHistoryTestOpts, seed int64, filePrefix string) { // loop for n steps. at every step: // * if we have never reached the target files, then just create a file. // * if we have, create/delete/rename/rename with override at random. @@ -334,7 +334,7 @@ type fileHistoryTestOpts struct { } func fileHistoryTest( - log eggs.LogLevels, + log *eggs.Logger, shuckleAddress string, mbs0 eggs.MockableBlockServices, opts *fileHistoryTestOpts, diff --git a/go/integrationtest/fstest.go b/go/integrationtest/fstest.go index 8d843da3..b2c5935c 100644 --- a/go/integrationtest/fstest.go +++ b/go/integrationtest/fstest.go @@ -24,13 +24,13 @@ type fsTestOpts struct { } type fsTestHarness[Id comparable] interface { - createDirectory(log eggs.LogLevels, owner Id, name string) (Id, msgs.EggsTime) - rename(log eggs.LogLevels, targetId Id, oldOwner Id, oldCreationTime msgs.EggsTime, oldName string, newOwner Id, newName string) (Id, msgs.EggsTime) - createFile(log eggs.LogLevels, owner Id, spanSize uint64, name string, size uint64, genData func(size uint64) []byte) (Id, msgs.EggsTime) + createDirectory(log *eggs.Logger, owner Id, name string) (Id, msgs.EggsTime) + rename(log *eggs.Logger, targetId Id, oldOwner Id, oldCreationTime msgs.EggsTime, oldName string, newOwner Id, newName string) (Id, msgs.EggsTime) + createFile(log *eggs.Logger, owner Id, spanSize uint64, name string, size uint64, genData func(size uint64) []byte) (Id, msgs.EggsTime) // if false, the harness does not support reading files (e.g. we're mocking block services) - checkFileData(log eggs.LogLevels, id Id, size uint64, genData func(size uint64) []byte) + checkFileData(log *eggs.Logger, id Id, size uint64, genData func(size uint64) []byte) // files, directories - readDirectory(log eggs.LogLevels, dir Id) ([]string, []string) + readDirectory(log *eggs.Logger, dir Id) ([]string, []string) } type apiFsTestHarness struct { @@ -38,7 +38,7 @@ type apiFsTestHarness struct { mbs eggs.MockableBlockServices } -func (c *apiFsTestHarness) createDirectory(log eggs.LogLevels, owner msgs.InodeId, name string) (id msgs.InodeId, creationTime msgs.EggsTime) { +func (c *apiFsTestHarness) createDirectory(log *eggs.Logger, owner msgs.InodeId, name string) (id msgs.InodeId, creationTime msgs.EggsTime) { req := msgs.MakeDirectoryReq{ OwnerId: owner, Name: name, @@ -50,7 +50,7 @@ func (c *apiFsTestHarness) createDirectory(log eggs.LogLevels, owner msgs.InodeI } func (c *apiFsTestHarness) rename( - log eggs.LogLevels, + log *eggs.Logger, targetId msgs.InodeId, oldOwner msgs.InodeId, oldCreationTime msgs.EggsTime, @@ -97,12 +97,12 @@ func (c *apiFsTestHarness) rename( } func (c *apiFsTestHarness) createFile( - log eggs.LogLevels, owner msgs.InodeId, spanSize uint64, name string, size uint64, genData func(size uint64) []byte, + log *eggs.Logger, owner msgs.InodeId, spanSize uint64, name string, size uint64, genData func(size uint64) []byte, ) (msgs.InodeId, msgs.EggsTime) { return createFile(log, c.client, c.mbs, owner, spanSize, name, size, genData) } -func (c *apiFsTestHarness) readDirectory(log eggs.LogLevels, dir msgs.InodeId) (files []string, dirs []string) { +func (c *apiFsTestHarness) readDirectory(log *eggs.Logger, dir msgs.InodeId) (files []string, dirs []string) { edges := readDir(log, c.client, dir) for _, edge := range edges { if edge.targetId.Type() == msgs.DIRECTORY { @@ -133,7 +133,7 @@ func checkFileData(actualData []byte, expectedData []byte) { } -func (c *apiFsTestHarness) checkFileData(log eggs.LogLevels, id msgs.InodeId, size uint64, genData func(size uint64) []byte) { +func (c *apiFsTestHarness) checkFileData(log *eggs.Logger, id msgs.InodeId, size uint64, genData func(size uint64) []byte) { _, isFake := c.mbs.(*eggs.MockedBlockServices) if isFake { return @@ -147,9 +147,9 @@ var _ = (fsTestHarness[msgs.InodeId])((*apiFsTestHarness)(nil)) type posixFsTestHarness struct{} -func (*posixFsTestHarness) createDirectory(log eggs.LogLevels, owner string, name string) (fullPath string, creationTime msgs.EggsTime) { +func (*posixFsTestHarness) createDirectory(log *eggs.Logger, owner string, name string) (fullPath string, creationTime msgs.EggsTime) { fullPath = path.Join(owner, name) - log.DebugStack(1, "posix mkdir %v", fullPath) + log.LogStack(1, eggs.DEBUG, "posix mkdir %v", fullPath) if err := os.Mkdir(fullPath, 0777); err != nil { panic(err) } @@ -157,7 +157,7 @@ func (*posixFsTestHarness) createDirectory(log eggs.LogLevels, owner string, nam } func (*posixFsTestHarness) rename( - log eggs.LogLevels, + log *eggs.Logger, targetFullPath string, oldDir string, oldCreationTime msgs.EggsTime, @@ -169,7 +169,7 @@ func (*posixFsTestHarness) rename( panic(fmt.Errorf("mismatching %v and %v", targetFullPath, path.Join(oldDir, oldName))) } newFullPath := path.Join(newDir, newName) - log.DebugStack(1, "posix rename %v -> %v", targetFullPath, newFullPath) + log.LogStack(1, eggs.DEBUG, "posix rename %v -> %v", targetFullPath, newFullPath) if err := os.Rename(targetFullPath, path.Join(newDir, newName)); err != nil { panic(err) } @@ -177,19 +177,19 @@ func (*posixFsTestHarness) rename( } func (c *posixFsTestHarness) createFile( - log eggs.LogLevels, dirFullPath string, spanSize uint64, name string, size uint64, genData func(size uint64) []byte, + log *eggs.Logger, dirFullPath string, spanSize uint64, name string, size uint64, genData func(size uint64) []byte, ) (fileFullPath string, t msgs.EggsTime) { data := genData(size) fileFullPath = path.Join(dirFullPath, name) - log.DebugStack(1, "posix create file %v", fileFullPath) + log.LogStack(1, eggs.DEBUG, "posix create file %v", fileFullPath) if err := os.WriteFile(fileFullPath, data, 0644); err != nil { panic(err) } return fileFullPath, 0 } -func (c *posixFsTestHarness) readDirectory(log eggs.LogLevels, dirFullPath string) (files []string, dirs []string) { - log.DebugStack(1, "posix readdir for %v", dirFullPath) +func (c *posixFsTestHarness) readDirectory(log *eggs.Logger, dirFullPath string) (files []string, dirs []string) { + log.LogStack(1, eggs.DEBUG, "posix readdir for %v", dirFullPath) fileInfo, err := ioutil.ReadDir(dirFullPath) if err != nil { panic(err) @@ -204,7 +204,7 @@ func (c *posixFsTestHarness) readDirectory(log eggs.LogLevels, dirFullPath strin return files, dirs } -func (c *posixFsTestHarness) checkFileData(log eggs.LogLevels, fullFilePath string, size uint64, genData func(size uint64) []byte) { +func (c *posixFsTestHarness) checkFileData(log *eggs.Logger, fullFilePath string, size uint64, genData func(size uint64) []byte) { allData := genData(size) fileData := make([]byte, len(allData)) f, err := os.Open(fullFilePath) @@ -291,7 +291,7 @@ func (s *fsTestState[Id]) dir(path []int) *fsTestDir[Id] { return s.rootDir.dir(path) } -func (state *fsTestState[Id]) incrementDirs(log eggs.LogLevels, opts *fsTestOpts) { +func (state *fsTestState[Id]) incrementDirs(log *eggs.Logger, opts *fsTestOpts) { if state.totalDirs >= opts.numDirs { panic("ran out of dirs!") } @@ -301,7 +301,7 @@ func (state *fsTestState[Id]) incrementDirs(log eggs.LogLevels, opts *fsTestOpts } } -func (state *fsTestState[Id]) makeDir(log eggs.LogLevels, harness fsTestHarness[Id], opts *fsTestOpts, parent []int, name int) []int { +func (state *fsTestState[Id]) makeDir(log *eggs.Logger, harness fsTestHarness[Id], opts *fsTestOpts, parent []int, name int) []int { state.incrementDirs(log, opts) dir := state.dir(parent) _, dirExists := dir.children.directories[name] @@ -322,7 +322,7 @@ func (state *fsTestState[Id]) makeDir(log eggs.LogLevels, harness fsTestHarness[ return path } -func (state *fsTestState[Id]) makeDirFromTemp(log eggs.LogLevels, harness fsTestHarness[Id], opts *fsTestOpts, parent []int, name int, tmpParent []int) []int { +func (state *fsTestState[Id]) makeDirFromTemp(log *eggs.Logger, harness fsTestHarness[Id], opts *fsTestOpts, parent []int, name int, tmpParent []int) []int { dir := state.dir(parent) _, dirExists := dir.children.directories[name] if dirExists { @@ -349,7 +349,7 @@ func (state *fsTestState[Id]) makeDirFromTemp(log eggs.LogLevels, harness fsTest return path } -func (state *fsTestState[Id]) incrementFiles(log eggs.LogLevels, opts *fsTestOpts) { +func (state *fsTestState[Id]) incrementFiles(log *eggs.Logger, opts *fsTestOpts) { if state.totalFiles >= opts.numFiles { panic("ran out of files!") } @@ -368,7 +368,7 @@ func genDataWithSeed(seed int64, size uint64) []byte { return data } -func (state *fsTestState[Id]) makeFile(log eggs.LogLevels, harness fsTestHarness[Id], opts *fsTestOpts, rand *rand.Rand, dirPath []int, name int) { +func (state *fsTestState[Id]) makeFile(log *eggs.Logger, harness fsTestHarness[Id], opts *fsTestOpts, rand *rand.Rand, dirPath []int, name int) { state.incrementFiles(log, opts) dir := state.dir(dirPath) _, dirExists := dir.children.directories[name] @@ -395,7 +395,7 @@ func (state *fsTestState[Id]) makeFile(log eggs.LogLevels, harness fsTestHarness } } -func (state *fsTestState[Id]) makeFileFromTemp(log eggs.LogLevels, harness fsTestHarness[Id], opts *fsTestOpts, rand *rand.Rand, dirPath []int, name int, tmpDirPath []int) { +func (state *fsTestState[Id]) makeFileFromTemp(log *eggs.Logger, harness fsTestHarness[Id], opts *fsTestOpts, rand *rand.Rand, dirPath []int, name int, tmpDirPath []int) { state.incrementFiles(log, opts) dir := state.dir(dirPath) _, dirExists := dir.children.directories[name] @@ -430,7 +430,7 @@ func (state *fsTestState[Id]) makeFileFromTemp(log eggs.LogLevels, harness fsTes } } -func (d *fsTestDir[Id]) check(log eggs.LogLevels, harness fsTestHarness[Id]) { +func (d *fsTestDir[Id]) check(log *eggs.Logger, harness fsTestHarness[Id]) { files, dirs := harness.readDirectory(log, d.id) if len(files)+len(dirs) != len(d.children.files)+len(d.children.directories) { panic(fmt.Errorf("bad number of edges -- got %v + %v, expected %v + %v", len(files), len(dirs), len(d.children.files), len(d.children.files))) @@ -466,7 +466,7 @@ func (d *fsTestDir[Id]) check(log eggs.LogLevels, harness fsTestHarness[Id]) { } // Just the first block service id we can find -func findBlockServiceToPurge(log eggs.LogLevels, client *eggs.Client) msgs.BlockServiceId { +func findBlockServiceToPurge(log *eggs.Logger, client *eggs.Client) msgs.BlockServiceId { filesReq := msgs.VisitFilesReq{} filesResp := msgs.VisitFilesResp{} for { @@ -492,7 +492,7 @@ func findBlockServiceToPurge(log eggs.LogLevels, client *eggs.Client) msgs.Block } func fsTestInternal[Id comparable]( - log eggs.LogLevels, + log *eggs.Logger, shuckleAddress string, opts *fsTestOpts, counters *eggs.ClientCounters, @@ -583,7 +583,7 @@ func fsTestInternal[Id comparable]( } func fsTest( - log eggs.LogLevels, + log *eggs.Logger, shuckleAddress string, opts *fsTestOpts, counters *eggs.ClientCounters, diff --git a/go/integrationtest/integrationtest.go b/go/integrationtest/integrationtest.go index 9cb18edf..3f0e8850 100644 --- a/go/integrationtest/integrationtest.go +++ b/go/integrationtest/integrationtest.go @@ -44,7 +44,7 @@ func formatNanos(nanos int64) string { var stacktraceLock sync.Mutex -func handleRecover(log eggs.LogLevels, terminateChan chan any, err any) { +func handleRecover(log *eggs.Logger, terminateChan chan any, err any) { if err != nil { log.RaiseAlert(err) stacktraceLock.Lock() @@ -68,7 +68,7 @@ func formatCounters(what string, counters *eggs.ReqCounters) { } func runTest( - log eggs.LogLevels, + log *eggs.Logger, shuckleAddress string, mbs eggs.MockableBlockServices, filter *regexp.Regexp, @@ -113,7 +113,7 @@ func runTest( } } -func runTests(terminateChan chan any, log eggs.LogLevels, shuckleAddress string, blockServices []msgs.BlockServiceInfo, fuseMountPoint string, short bool, filter *regexp.Regexp) { +func runTests(terminateChan chan any, log *eggs.Logger, shuckleAddress string, blockServices []msgs.BlockServiceInfo, fuseMountPoint string, short bool, filter *regexp.Regexp) { defer func() { handleRecover(log, terminateChan, recover()) }() blockServicesKeys := make(map[msgs.BlockServiceId]cipher.Block) @@ -272,10 +272,7 @@ func main() { } defer logOut.Close() } - log := &eggs.LogLogger{ - Verbose: *verbose, - Logger: eggs.NewLogger(logOut), - } + log := eggs.NewLogger(*verbose, logOut) fmt.Printf("building shard/cdc/blockservice/shuckle\n") cppExes := eggs.BuildCppExes(log, *buildType) diff --git a/go/integrationtest/req.go b/go/integrationtest/req.go index c04d6608..4ad46e03 100644 --- a/go/integrationtest/req.go +++ b/go/integrationtest/req.go @@ -19,14 +19,14 @@ type fullEdge struct { creationTime msgs.EggsTime } -func shardReq(log eggs.LogLevels, client *eggs.Client, shid msgs.ShardId, reqBody msgs.ShardRequest, respBody msgs.ShardResponse) { +func shardReq(log *eggs.Logger, client *eggs.Client, shid msgs.ShardId, reqBody msgs.ShardRequest, respBody msgs.ShardResponse) { err := client.ShardRequest(log, shid, reqBody, respBody) if err != nil { panic(err) } } -func cdcReq(log eggs.LogLevels, client *eggs.Client, reqBody msgs.CDCRequest, respBody msgs.CDCResponse) { +func cdcReq(log *eggs.Logger, client *eggs.Client, reqBody msgs.CDCRequest, respBody msgs.CDCResponse) { err := client.CDCRequest(log, reqBody, respBody) if err != nil { panic(err) @@ -55,7 +55,7 @@ func createFileData( */ func createFile( - log eggs.LogLevels, + log *eggs.Logger, client *eggs.Client, mbs eggs.MockableBlockServices, dirId msgs.InodeId, @@ -175,7 +175,7 @@ func createFile( return constructResp.Id, linkResp.CreationTime } -func readFile(log eggs.LogLevels, client *eggs.Client, mbs eggs.MockableBlockServices, id msgs.InodeId) []byte { +func readFile(log *eggs.Logger, client *eggs.Client, mbs eggs.MockableBlockServices, id msgs.InodeId) []byte { data := bytes.NewBuffer([]byte{}) spansReq := msgs.FileSpansReq{ FileId: id, @@ -215,7 +215,7 @@ func readFile(log eggs.LogLevels, client *eggs.Client, mbs eggs.MockableBlockSer return data.Bytes() } -func readDir(log eggs.LogLevels, client *eggs.Client, dir msgs.InodeId) []edge { +func readDir(log *eggs.Logger, client *eggs.Client, dir msgs.InodeId) []edge { req := msgs.ReadDirReq{ DirId: dir, StartHash: 0, @@ -238,7 +238,7 @@ func readDir(log eggs.LogLevels, client *eggs.Client, dir msgs.InodeId) []edge { return edges } -func fullReadDir(log eggs.LogLevels, client *eggs.Client, dirId msgs.InodeId) []fullEdge { +func fullReadDir(log *eggs.Logger, client *eggs.Client, dirId msgs.InodeId) []fullEdge { req := msgs.FullReadDirReq{ DirId: msgs.ROOT_DIR_INODE_ID, } diff --git a/go/msgs/msgs.go b/go/msgs/msgs.go index eb5041e2..609b40ed 100644 --- a/go/msgs/msgs.go +++ b/go/msgs/msgs.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "strconv" "time" "xtx/eggsfs/bincode" ) @@ -93,6 +94,23 @@ func (id InodeId) String() string { return fmt.Sprintf("0x%X", uint64(id)) } +func (id InodeId) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("%q", id.String())), nil +} + +func (id *InodeId) UnmarshalJSON(b []byte) error { + var ids string + if err := json.Unmarshal(b, &ids); err != nil { + return err + } + idu, err := strconv.ParseUint(ids, 0, 63) + if err != nil { + return err + } + *id = InodeId(idu) + return nil +} + func (id BlockServiceId) String() string { return fmt.Sprintf("0x%016x", uint64(id)) } @@ -1159,6 +1177,7 @@ type BlockServiceInfo struct { AvailableBytes uint64 Blocks uint64 // how many blocks we have Path string + LastSeen EggsTime } type UpdateBlockServicesEntry struct { @@ -1259,8 +1278,9 @@ type RegisterBlockServicesResp struct{} type ShardsReq struct{} type ShardInfo struct { - Ip [4]byte - Port uint16 + Ip [4]byte + Port uint16 + LastSeen EggsTime } type ShardsResp struct { @@ -1286,6 +1306,7 @@ type RegisterCdcResp struct{} type CdcReq struct{} type CdcResp struct { - Ip [4]byte - Port uint16 + Ip [4]byte + Port uint16 + LastSeen EggsTime } diff --git a/go/msgs/msgs_bincode.go b/go/msgs/msgs_bincode.go index e0078a8c..cdb86a52 100644 --- a/go/msgs/msgs_bincode.go +++ b/go/msgs/msgs_bincode.go @@ -2,65 +2,67 @@ // Run `go generate ./...` from the go/ directory to regenerate it. package msgs -import "fmt" -import "io" -import "xtx/eggsfs/bincode" +import ( + "fmt" + "io" + "xtx/eggsfs/bincode" +) func (err ErrCode) Error() string { return err.String() } const ( - INTERNAL_ERROR ErrCode = 10 - FATAL_ERROR ErrCode = 11 - TIMEOUT ErrCode = 12 - MALFORMED_REQUEST ErrCode = 13 - MALFORMED_RESPONSE ErrCode = 14 - NOT_AUTHORISED ErrCode = 15 - UNRECOGNIZED_REQUEST ErrCode = 16 - FILE_NOT_FOUND ErrCode = 17 - DIRECTORY_NOT_FOUND ErrCode = 18 - NAME_NOT_FOUND ErrCode = 19 - EDGE_NOT_FOUND ErrCode = 20 - EDGE_IS_LOCKED ErrCode = 21 - TYPE_IS_DIRECTORY ErrCode = 22 - TYPE_IS_NOT_DIRECTORY ErrCode = 23 - BAD_COOKIE ErrCode = 24 + INTERNAL_ERROR ErrCode = 10 + FATAL_ERROR ErrCode = 11 + TIMEOUT ErrCode = 12 + MALFORMED_REQUEST ErrCode = 13 + MALFORMED_RESPONSE ErrCode = 14 + NOT_AUTHORISED ErrCode = 15 + UNRECOGNIZED_REQUEST ErrCode = 16 + FILE_NOT_FOUND ErrCode = 17 + DIRECTORY_NOT_FOUND ErrCode = 18 + NAME_NOT_FOUND ErrCode = 19 + EDGE_NOT_FOUND ErrCode = 20 + EDGE_IS_LOCKED ErrCode = 21 + TYPE_IS_DIRECTORY ErrCode = 22 + TYPE_IS_NOT_DIRECTORY ErrCode = 23 + BAD_COOKIE ErrCode = 24 INCONSISTENT_STORAGE_CLASS_PARITY ErrCode = 25 - LAST_SPAN_STATE_NOT_CLEAN ErrCode = 26 - COULD_NOT_PICK_BLOCK_SERVICES ErrCode = 27 - BAD_SPAN_BODY ErrCode = 28 - SPAN_NOT_FOUND ErrCode = 29 - BLOCK_SERVICE_NOT_FOUND ErrCode = 30 - CANNOT_CERTIFY_BLOCKLESS_SPAN ErrCode = 31 - BAD_NUMBER_OF_BLOCKS_PROOFS ErrCode = 32 - BAD_BLOCK_PROOF ErrCode = 33 - CANNOT_OVERRIDE_NAME ErrCode = 34 - NAME_IS_LOCKED ErrCode = 35 - MTIME_IS_TOO_RECENT ErrCode = 36 - MISMATCHING_TARGET ErrCode = 37 - MISMATCHING_OWNER ErrCode = 38 - MISMATCHING_CREATION_TIME ErrCode = 39 - DIRECTORY_NOT_EMPTY ErrCode = 40 - FILE_IS_TRANSIENT ErrCode = 41 - OLD_DIRECTORY_NOT_FOUND ErrCode = 42 - NEW_DIRECTORY_NOT_FOUND ErrCode = 43 - LOOP_IN_DIRECTORY_RENAME ErrCode = 44 - DIRECTORY_HAS_OWNER ErrCode = 45 - FILE_IS_NOT_TRANSIENT ErrCode = 46 - FILE_NOT_EMPTY ErrCode = 47 - CANNOT_REMOVE_ROOT_DIRECTORY ErrCode = 48 - FILE_EMPTY ErrCode = 49 - CANNOT_REMOVE_DIRTY_SPAN ErrCode = 50 - BAD_SHARD ErrCode = 51 - BAD_NAME ErrCode = 52 - MORE_RECENT_SNAPSHOT_EDGE ErrCode = 53 - MORE_RECENT_CURRENT_EDGE ErrCode = 54 - BAD_DIRECTORY_INFO ErrCode = 55 - DEADLINE_NOT_PASSED ErrCode = 56 - SAME_SOURCE_AND_DESTINATION ErrCode = 57 - SAME_DIRECTORIES ErrCode = 58 - SAME_SHARD ErrCode = 59 + LAST_SPAN_STATE_NOT_CLEAN ErrCode = 26 + COULD_NOT_PICK_BLOCK_SERVICES ErrCode = 27 + BAD_SPAN_BODY ErrCode = 28 + SPAN_NOT_FOUND ErrCode = 29 + BLOCK_SERVICE_NOT_FOUND ErrCode = 30 + CANNOT_CERTIFY_BLOCKLESS_SPAN ErrCode = 31 + BAD_NUMBER_OF_BLOCKS_PROOFS ErrCode = 32 + BAD_BLOCK_PROOF ErrCode = 33 + CANNOT_OVERRIDE_NAME ErrCode = 34 + NAME_IS_LOCKED ErrCode = 35 + MTIME_IS_TOO_RECENT ErrCode = 36 + MISMATCHING_TARGET ErrCode = 37 + MISMATCHING_OWNER ErrCode = 38 + MISMATCHING_CREATION_TIME ErrCode = 39 + DIRECTORY_NOT_EMPTY ErrCode = 40 + FILE_IS_TRANSIENT ErrCode = 41 + OLD_DIRECTORY_NOT_FOUND ErrCode = 42 + NEW_DIRECTORY_NOT_FOUND ErrCode = 43 + LOOP_IN_DIRECTORY_RENAME ErrCode = 44 + DIRECTORY_HAS_OWNER ErrCode = 45 + FILE_IS_NOT_TRANSIENT ErrCode = 46 + FILE_NOT_EMPTY ErrCode = 47 + CANNOT_REMOVE_ROOT_DIRECTORY ErrCode = 48 + FILE_EMPTY ErrCode = 49 + CANNOT_REMOVE_DIRTY_SPAN ErrCode = 50 + BAD_SHARD ErrCode = 51 + BAD_NAME ErrCode = 52 + MORE_RECENT_SNAPSHOT_EDGE ErrCode = 53 + MORE_RECENT_CURRENT_EDGE ErrCode = 54 + BAD_DIRECTORY_INFO ErrCode = 55 + DEADLINE_NOT_PASSED ErrCode = 56 + SAME_SOURCE_AND_DESTINATION ErrCode = 57 + SAME_DIRECTORIES ErrCode = 58 + SAME_SHARD ErrCode = 59 ) func (err ErrCode) String() string { @@ -245,44 +247,118 @@ func (k ShardMessageKind) String() string { } } - const ( - LOOKUP ShardMessageKind = 0x1 - STAT_FILE ShardMessageKind = 0x2 - STAT_TRANSIENT_FILE ShardMessageKind = 0xA - STAT_DIRECTORY ShardMessageKind = 0x8 - READ_DIR ShardMessageKind = 0x3 - CONSTRUCT_FILE ShardMessageKind = 0x4 - ADD_SPAN_INITIATE ShardMessageKind = 0x5 - ADD_SPAN_CERTIFY ShardMessageKind = 0x6 - LINK_FILE ShardMessageKind = 0x7 - SOFT_UNLINK_FILE ShardMessageKind = 0xC - FILE_SPANS ShardMessageKind = 0xD - SAME_DIRECTORY_RENAME ShardMessageKind = 0xE - SET_DIRECTORY_INFO ShardMessageKind = 0xF - SNAPSHOT_LOOKUP ShardMessageKind = 0x9 - EXPIRE_TRANSIENT_FILE ShardMessageKind = 0xB - VISIT_DIRECTORIES ShardMessageKind = 0x15 - VISIT_FILES ShardMessageKind = 0x20 - VISIT_TRANSIENT_FILES ShardMessageKind = 0x16 - FULL_READ_DIR ShardMessageKind = 0x21 - REMOVE_NON_OWNED_EDGE ShardMessageKind = 0x17 - SAME_SHARD_HARD_FILE_UNLINK ShardMessageKind = 0x18 - REMOVE_SPAN_INITIATE ShardMessageKind = 0x19 - REMOVE_SPAN_CERTIFY ShardMessageKind = 0x1A - SWAP_BLOCKS ShardMessageKind = 0x22 - BLOCK_SERVICE_FILES ShardMessageKind = 0x23 - REMOVE_INODE ShardMessageKind = 0x24 - CREATE_DIRECTORY_INODE ShardMessageKind = 0x80 - SET_DIRECTORY_OWNER ShardMessageKind = 0x81 - REMOVE_DIRECTORY_OWNER ShardMessageKind = 0x89 - CREATE_LOCKED_CURRENT_EDGE ShardMessageKind = 0x82 - LOCK_CURRENT_EDGE ShardMessageKind = 0x83 - UNLOCK_CURRENT_EDGE ShardMessageKind = 0x84 + LOOKUP ShardMessageKind = 0x1 + STAT_FILE ShardMessageKind = 0x2 + STAT_TRANSIENT_FILE ShardMessageKind = 0xA + STAT_DIRECTORY ShardMessageKind = 0x8 + READ_DIR ShardMessageKind = 0x3 + CONSTRUCT_FILE ShardMessageKind = 0x4 + ADD_SPAN_INITIATE ShardMessageKind = 0x5 + ADD_SPAN_CERTIFY ShardMessageKind = 0x6 + LINK_FILE ShardMessageKind = 0x7 + SOFT_UNLINK_FILE ShardMessageKind = 0xC + FILE_SPANS ShardMessageKind = 0xD + SAME_DIRECTORY_RENAME ShardMessageKind = 0xE + SET_DIRECTORY_INFO ShardMessageKind = 0xF + SNAPSHOT_LOOKUP ShardMessageKind = 0x9 + EXPIRE_TRANSIENT_FILE ShardMessageKind = 0xB + VISIT_DIRECTORIES ShardMessageKind = 0x15 + VISIT_FILES ShardMessageKind = 0x20 + VISIT_TRANSIENT_FILES ShardMessageKind = 0x16 + FULL_READ_DIR ShardMessageKind = 0x21 + REMOVE_NON_OWNED_EDGE ShardMessageKind = 0x17 + SAME_SHARD_HARD_FILE_UNLINK ShardMessageKind = 0x18 + REMOVE_SPAN_INITIATE ShardMessageKind = 0x19 + REMOVE_SPAN_CERTIFY ShardMessageKind = 0x1A + SWAP_BLOCKS ShardMessageKind = 0x22 + BLOCK_SERVICE_FILES ShardMessageKind = 0x23 + REMOVE_INODE ShardMessageKind = 0x24 + CREATE_DIRECTORY_INODE ShardMessageKind = 0x80 + SET_DIRECTORY_OWNER ShardMessageKind = 0x81 + REMOVE_DIRECTORY_OWNER ShardMessageKind = 0x89 + CREATE_LOCKED_CURRENT_EDGE ShardMessageKind = 0x82 + LOCK_CURRENT_EDGE ShardMessageKind = 0x83 + UNLOCK_CURRENT_EDGE ShardMessageKind = 0x84 REMOVE_OWNED_SNAPSHOT_FILE_EDGE ShardMessageKind = 0x86 - MAKE_FILE_TRANSIENT ShardMessageKind = 0x87 + MAKE_FILE_TRANSIENT ShardMessageKind = 0x87 ) +func MkShardMessage(k string) (ShardRequest, ShardResponse) { + switch { + case k == "LOOKUP": + return &LookupReq{}, &LookupResp{} + case k == "STAT_FILE": + return &StatFileReq{}, &StatFileResp{} + case k == "STAT_TRANSIENT_FILE": + return &StatTransientFileReq{}, &StatTransientFileResp{} + case k == "STAT_DIRECTORY": + return &StatDirectoryReq{}, &StatDirectoryResp{} + case k == "READ_DIR": + return &ReadDirReq{}, &ReadDirResp{} + case k == "CONSTRUCT_FILE": + return &ConstructFileReq{}, &ConstructFileResp{} + case k == "ADD_SPAN_INITIATE": + return &AddSpanInitiateReq{}, &AddSpanInitiateResp{} + case k == "ADD_SPAN_CERTIFY": + return &AddSpanCertifyReq{}, &AddSpanCertifyResp{} + case k == "LINK_FILE": + return &LinkFileReq{}, &LinkFileResp{} + case k == "SOFT_UNLINK_FILE": + return &SoftUnlinkFileReq{}, &SoftUnlinkFileResp{} + case k == "FILE_SPANS": + return &FileSpansReq{}, &FileSpansResp{} + case k == "SAME_DIRECTORY_RENAME": + return &SameDirectoryRenameReq{}, &SameDirectoryRenameResp{} + case k == "SET_DIRECTORY_INFO": + return &SetDirectoryInfoReq{}, &SetDirectoryInfoResp{} + case k == "SNAPSHOT_LOOKUP": + return &SnapshotLookupReq{}, &SnapshotLookupResp{} + case k == "EXPIRE_TRANSIENT_FILE": + return &ExpireTransientFileReq{}, &ExpireTransientFileResp{} + case k == "VISIT_DIRECTORIES": + return &VisitDirectoriesReq{}, &VisitDirectoriesResp{} + case k == "VISIT_FILES": + return &VisitFilesReq{}, &VisitFilesResp{} + case k == "VISIT_TRANSIENT_FILES": + return &VisitTransientFilesReq{}, &VisitTransientFilesResp{} + case k == "FULL_READ_DIR": + return &FullReadDirReq{}, &FullReadDirResp{} + case k == "REMOVE_NON_OWNED_EDGE": + return &RemoveNonOwnedEdgeReq{}, &RemoveNonOwnedEdgeResp{} + case k == "SAME_SHARD_HARD_FILE_UNLINK": + return &SameShardHardFileUnlinkReq{}, &SameShardHardFileUnlinkResp{} + case k == "REMOVE_SPAN_INITIATE": + return &RemoveSpanInitiateReq{}, &RemoveSpanInitiateResp{} + case k == "REMOVE_SPAN_CERTIFY": + return &RemoveSpanCertifyReq{}, &RemoveSpanCertifyResp{} + case k == "SWAP_BLOCKS": + return &SwapBlocksReq{}, &SwapBlocksResp{} + case k == "BLOCK_SERVICE_FILES": + return &BlockServiceFilesReq{}, &BlockServiceFilesResp{} + case k == "REMOVE_INODE": + return &RemoveInodeReq{}, &RemoveInodeResp{} + case k == "CREATE_DIRECTORY_INODE": + return &CreateDirectoryInodeReq{}, &CreateDirectoryInodeResp{} + case k == "SET_DIRECTORY_OWNER": + return &SetDirectoryOwnerReq{}, &SetDirectoryOwnerResp{} + case k == "REMOVE_DIRECTORY_OWNER": + return &RemoveDirectoryOwnerReq{}, &RemoveDirectoryOwnerResp{} + case k == "CREATE_LOCKED_CURRENT_EDGE": + return &CreateLockedCurrentEdgeReq{}, &CreateLockedCurrentEdgeResp{} + case k == "LOCK_CURRENT_EDGE": + return &LockCurrentEdgeReq{}, &LockCurrentEdgeResp{} + case k == "UNLOCK_CURRENT_EDGE": + return &UnlockCurrentEdgeReq{}, &UnlockCurrentEdgeResp{} + case k == "REMOVE_OWNED_SNAPSHOT_FILE_EDGE": + return &RemoveOwnedSnapshotFileEdgeReq{}, &RemoveOwnedSnapshotFileEdgeResp{} + case k == "MAKE_FILE_TRANSIENT": + return &MakeFileTransientReq{}, &MakeFileTransientResp{} + default: + panic(fmt.Errorf("bad kind string %s", k)) + } +} + func (k CDCMessageKind) String() string { switch k { case 1: @@ -302,16 +378,34 @@ func (k CDCMessageKind) String() string { } } - const ( - MAKE_DIRECTORY CDCMessageKind = 0x1 - RENAME_FILE CDCMessageKind = 0x2 - SOFT_UNLINK_DIRECTORY CDCMessageKind = 0x3 - RENAME_DIRECTORY CDCMessageKind = 0x4 - HARD_UNLINK_DIRECTORY CDCMessageKind = 0x5 + MAKE_DIRECTORY CDCMessageKind = 0x1 + RENAME_FILE CDCMessageKind = 0x2 + SOFT_UNLINK_DIRECTORY CDCMessageKind = 0x3 + RENAME_DIRECTORY CDCMessageKind = 0x4 + HARD_UNLINK_DIRECTORY CDCMessageKind = 0x5 CROSS_SHARD_HARD_UNLINK_FILE CDCMessageKind = 0x6 ) +func MkCDCMessage(k string) (CDCRequest, CDCResponse) { + switch { + case k == "MAKE_DIRECTORY": + return &MakeDirectoryReq{}, &MakeDirectoryResp{} + case k == "RENAME_FILE": + return &RenameFileReq{}, &RenameFileResp{} + case k == "SOFT_UNLINK_DIRECTORY": + return &SoftUnlinkDirectoryReq{}, &SoftUnlinkDirectoryResp{} + case k == "RENAME_DIRECTORY": + return &RenameDirectoryReq{}, &RenameDirectoryResp{} + case k == "HARD_UNLINK_DIRECTORY": + return &HardUnlinkDirectoryReq{}, &HardUnlinkDirectoryResp{} + case k == "CROSS_SHARD_HARD_UNLINK_FILE": + return &CrossShardHardUnlinkFileReq{}, &CrossShardHardUnlinkFileResp{} + default: + panic(fmt.Errorf("bad kind string %s", k)) + } +} + func (k ShuckleMessageKind) String() string { switch k { case 1: @@ -333,17 +427,37 @@ func (k ShuckleMessageKind) String() string { } } - const ( BLOCK_SERVICES_FOR_SHARD ShuckleMessageKind = 0x1 - REGISTER_BLOCK_SERVICES ShuckleMessageKind = 0x2 - SHARDS ShuckleMessageKind = 0x3 - REGISTER_SHARD ShuckleMessageKind = 0x4 - ALL_BLOCK_SERVICES ShuckleMessageKind = 0x5 - REGISTER_CDC ShuckleMessageKind = 0x6 - CDC ShuckleMessageKind = 0x7 + REGISTER_BLOCK_SERVICES ShuckleMessageKind = 0x2 + SHARDS ShuckleMessageKind = 0x3 + REGISTER_SHARD ShuckleMessageKind = 0x4 + ALL_BLOCK_SERVICES ShuckleMessageKind = 0x5 + REGISTER_CDC ShuckleMessageKind = 0x6 + CDC ShuckleMessageKind = 0x7 ) +func MkShuckleMessage(k string) (ShuckleRequest, ShuckleResponse) { + switch { + case k == "BLOCK_SERVICES_FOR_SHARD": + return &BlockServicesForShardReq{}, &BlockServicesForShardResp{} + case k == "REGISTER_BLOCK_SERVICES": + return &RegisterBlockServicesReq{}, &RegisterBlockServicesResp{} + case k == "SHARDS": + return &ShardsReq{}, &ShardsResp{} + case k == "REGISTER_SHARD": + return &RegisterShardReq{}, &RegisterShardResp{} + case k == "ALL_BLOCK_SERVICES": + return &AllBlockServicesReq{}, &AllBlockServicesResp{} + case k == "REGISTER_CDC": + return &RegisterCdcReq{}, &RegisterCdcResp{} + case k == "CDC": + return &CdcReq{}, &CdcResp{} + default: + panic(fmt.Errorf("bad kind string %s", k)) + } +} + func (v *LookupReq) ShardRequestKind() ShardMessageKind { return LOOKUP } @@ -2998,6 +3112,9 @@ func (v *BlockServiceInfo) Pack(w io.Writer) error { if err := bincode.PackBytes(w, []byte(v.Path)); err != nil { return err } + if err := bincode.PackScalar(w, uint64(v.LastSeen)); err != nil { + return err + } return nil } @@ -3038,6 +3155,9 @@ func (v *BlockServiceInfo) Unpack(r io.Reader) error { if err := bincode.UnpackString(r, &v.Path); err != nil { return err } + if err := bincode.UnpackScalar(r, (*uint64)(&v.LastSeen)); err != nil { + return err + } return nil } @@ -3048,6 +3168,9 @@ func (v *ShardInfo) Pack(w io.Writer) error { if err := bincode.PackScalar(w, uint16(v.Port)); err != nil { return err } + if err := bincode.PackScalar(w, uint64(v.LastSeen)); err != nil { + return err + } return nil } @@ -3058,6 +3181,9 @@ func (v *ShardInfo) Unpack(r io.Reader) error { if err := bincode.UnpackScalar(r, (*uint16)(&v.Port)); err != nil { return err } + if err := bincode.UnpackScalar(r, (*uint64)(&v.LastSeen)); err != nil { + return err + } return nil } @@ -3334,6 +3460,9 @@ func (v *CdcResp) Pack(w io.Writer) error { if err := bincode.PackScalar(w, uint16(v.Port)); err != nil { return err } + if err := bincode.PackScalar(w, uint64(v.LastSeen)); err != nil { + return err + } return nil } @@ -3344,6 +3473,8 @@ func (v *CdcResp) Unpack(r io.Reader) error { if err := bincode.UnpackScalar(r, (*uint16)(&v.Port)); err != nil { return err } + if err := bincode.UnpackScalar(r, (*uint64)(&v.LastSeen)); err != nil { + return err + } return nil } -