From 51860fac3ad27787c6d8f797ca0db596ca611447 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Tue, 14 Feb 2023 17:51:50 +0000 Subject: [PATCH] Various improveents, nothing substantial --- cpp/cdc/CDC.cpp | 16 +- cpp/cdc/CDC.hpp | 2 +- cpp/cdc/CDCDBData.hpp | 3 +- cpp/cdc/eggscdc.cpp | 12 +- cpp/core/MsgsGen.cpp | 22 ++ cpp/core/MsgsGen.hpp | 25 +- cpp/shard/Shard.cpp | 18 +- cpp/shard/Shard.hpp | 2 +- cpp/shard/ShardDB.cpp | 2 + cpp/shard/eggsshard.cpp | 12 +- go/bincodegen/bincodegen.go | 1 + go/eggs/blockservicereq.go | 8 +- go/eggs/cdcreq.go | 14 +- go/eggs/client.go | 106 ++++++--- go/eggs/gc.go | 4 +- go/eggs/migrate.go | 7 +- go/eggs/mockableblockservice.go | 96 +++++++- go/eggs/shardreq.go | 5 +- go/eggs/timings.go | 54 +++++ go/eggsblocks/eggsblocks.go | 11 + go/eggsfuse/eggsfuse.go | 147 +++++++----- go/eggsrun/eggsrun.go | 10 +- go/eggsshuckle/base.html | 2 +- ...tstrap.min.css => bootstrap.5.0.2.min.css} | 0 go/eggsshuckle/chart.4.2.1.min.js | 1 + go/eggsshuckle/eggsshuckle.go | 16 +- go/integrationtest/cleanup.go | 9 +- go/integrationtest/filehistory.go | 5 +- go/integrationtest/fstest.go | 11 +- go/integrationtest/integrationtest.go | 80 +++---- go/integrationtest/req.go | 16 +- go/msgs/msgs.go | 7 +- go/msgs/msgs_bincode.go | 218 ++++++++++-------- 33 files changed, 633 insertions(+), 309 deletions(-) create mode 100644 go/eggs/timings.go rename go/eggsshuckle/{bootstrap.min.css => bootstrap.5.0.2.min.css} (100%) create mode 100644 go/eggsshuckle/chart.4.2.1.min.js diff --git a/cpp/cdc/CDC.cpp b/cpp/cdc/CDC.cpp index 3e3aadf6..d7770cf4 100644 --- a/cpp/cdc/CDC.cpp +++ b/cpp/cdc/CDC.cpp @@ -614,7 +614,21 @@ void runCDC(const std::string& dbDir, const CDCOptions& options) { } logOut = &fileOut; } - Logger logger(options.level, *logOut); + Logger logger(options.logLevel, *logOut); + + { + Env env(logger, "startup"); + LOG_INFO(env, "Running CDC with options:"); + LOG_INFO(env, " level = %s", options.logLevel); + LOG_INFO(env, " logFile = '%s'", options.logFile); + LOG_INFO(env, " port = %s", options.port); + LOG_INFO(env, " shuckleHost = '%s'", options.shuckleHost); + LOG_INFO(env, " shucklePort = %s", options.shucklePort); + { + char ip[INET_ADDRSTRLEN]; + LOG_INFO(env, " ownIp = %s", inet_ntop(AF_INET, &options.ownIp, ip, INET_ADDRSTRLEN)); + } + } CDCDB db(logger, dbDir); auto shared = std::make_unique(db); diff --git a/cpp/cdc/CDC.hpp b/cpp/cdc/CDC.hpp index 627affc5..dab0828d 100644 --- a/cpp/cdc/CDC.hpp +++ b/cpp/cdc/CDC.hpp @@ -3,7 +3,7 @@ #include "Env.hpp" struct CDCOptions { - LogLevel level = LogLevel::LOG_INFO; + LogLevel logLevel = LogLevel::LOG_INFO; std::string logFile = ""; // if empty, stdout uint16_t port = 0; // chosen randomly and recorded in shuckle std::string shuckleHost = ""; diff --git a/cpp/cdc/CDCDBData.hpp b/cpp/cdc/CDCDBData.hpp index dc5f83e0..3148c7ca 100644 --- a/cpp/cdc/CDCDBData.hpp +++ b/cpp/cdc/CDCDBData.hpp @@ -7,7 +7,7 @@ #include "Assert.hpp" #include "Bincode.hpp" #include "Exception.hpp" -#include "MsgsGen.hpp" +#include "Msgs.hpp" #include "RocksDBUtils.hpp" #include "Msgs.hpp" #include "Time.hpp" @@ -211,5 +211,6 @@ struct TxnState { default: throw EGGS_EXCEPTION("bad cdc message kind %s", reqKind()); } + memset(_data+MIN_SIZE, 0, size()-MIN_SIZE); } }; diff --git a/cpp/cdc/eggscdc.cpp b/cpp/cdc/eggscdc.cpp index d2f13e3b..e82190ef 100644 --- a/cpp/cdc/eggscdc.cpp +++ b/cpp/cdc/eggscdc.cpp @@ -76,17 +76,17 @@ int main(int argc, char** argv) { if (arg == "-h" || arg == "-help") { dieWithUsage(); } else if (arg == "-verbose") { - options.level = std::min(LogLevel::LOG_DEBUG, options.level); + options.logLevel = std::min(LogLevel::LOG_DEBUG, options.logLevel); } else if (arg == "-log-level") { std::string logLevel = getNextArg(); if (logLevel == "trace") { - options.level = LogLevel::LOG_TRACE; + options.logLevel = LogLevel::LOG_TRACE; } else if (logLevel == "debug") { - options.level = LogLevel::LOG_DEBUG; + options.logLevel = LogLevel::LOG_DEBUG; } else if (logLevel == "info") { - options.level = LogLevel::LOG_INFO; + options.logLevel = LogLevel::LOG_INFO; } else if (logLevel == "error") { - options.level = LogLevel::LOG_ERROR; + options.logLevel = LogLevel::LOG_ERROR; } else { die("Bad log level `%s'", logLevel.c_str()); } @@ -109,7 +109,7 @@ int main(int argc, char** argv) { } #ifndef EGGS_DEBUG - if (options.level <= LogLevel::LOG_TRACE) { + if (options.logLevel <= LogLevel::LOG_TRACE) { die("Cannot use log level trace trace for non-debug builds (it won't work)."); } #endif diff --git a/cpp/core/MsgsGen.cpp b/cpp/core/MsgsGen.cpp index 4e069eb5..9e274066 100644 --- a/cpp/core/MsgsGen.cpp +++ b/cpp/core/MsgsGen.cpp @@ -705,6 +705,28 @@ std::ostream& operator<<(std::ostream& out, const ShardInfo& x) { return out; } +void RegisterShardInfo::pack(BincodeBuf& buf) const { + buf.packFixedBytes<4>(ip); + buf.packScalar(port); +} +void RegisterShardInfo::unpack(BincodeBuf& buf) { + buf.unpackFixedBytes<4>(ip); + port = buf.unpackScalar(); +} +void RegisterShardInfo::clear() { + ip.clear(); + port = uint16_t(0); +} +bool RegisterShardInfo::operator==(const RegisterShardInfo& rhs) const { + if (ip != rhs.ip) { return false; }; + if ((uint16_t)this->port != (uint16_t)rhs.port) { return false; }; + return true; +} +std::ostream& operator<<(std::ostream& out, const RegisterShardInfo& x) { + out << "RegisterShardInfo(" << "Ip=" << x.ip << ", " << "Port=" << x.port << ")"; + return out; +} + void LookupReq::pack(BincodeBuf& buf) const { dirId.pack(buf); buf.packBytes(name); diff --git a/cpp/core/MsgsGen.hpp b/cpp/core/MsgsGen.hpp index 66ddd1fe..91b1b975 100644 --- a/cpp/core/MsgsGen.hpp +++ b/cpp/core/MsgsGen.hpp @@ -510,6 +510,27 @@ struct ShardInfo { std::ostream& operator<<(std::ostream& out, const ShardInfo& x); +struct RegisterShardInfo { + BincodeFixedBytes<4> ip; + uint16_t port; + + static constexpr uint16_t STATIC_SIZE = BincodeFixedBytes<4>::STATIC_SIZE + 2; // ip + port + + RegisterShardInfo() { clear(); } + uint16_t packedSize() const { + uint16_t _size = 0; + _size += BincodeFixedBytes<4>::STATIC_SIZE; // ip + _size += 2; // port + return _size; + } + void pack(BincodeBuf& buf) const; + void unpack(BincodeBuf& buf); + void clear(); + bool operator==(const RegisterShardInfo&rhs) const; +}; + +std::ostream& operator<<(std::ostream& out, const RegisterShardInfo& x); + struct LookupReq { InodeId dirId; BincodeBytes name; @@ -2304,9 +2325,9 @@ std::ostream& operator<<(std::ostream& out, const ShardsResp& x); struct RegisterShardReq { ShardId id; - ShardInfo info; + RegisterShardInfo info; - static constexpr uint16_t STATIC_SIZE = 1 + ShardInfo::STATIC_SIZE; // id + info + static constexpr uint16_t STATIC_SIZE = 1 + RegisterShardInfo::STATIC_SIZE; // id + info RegisterShardReq() { clear(); } uint16_t packedSize() const { diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 136c3420..99d09395 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -410,7 +410,23 @@ void runShard(ShardId shid, const std::string& dbDir, const ShardOptions& option } logOut = &fileOut; } - Logger logger(options.level, *logOut); + Logger logger(options.logLevel, *logOut); + + { + Env env(logger, "startup"); + LOG_INFO(env, "Running shard %s with options:", shid); + LOG_INFO(env, " level = %s", options.logLevel); + LOG_INFO(env, " logFile = '%s'", options.logFile); + LOG_INFO(env, " port = %s", options.port); + LOG_INFO(env, " shuckleHost = '%s'", options.shuckleHost); + LOG_INFO(env, " shucklePort = %s", options.shucklePort); + { + char ip[INET_ADDRSTRLEN]; + LOG_INFO(env, " ownIp = %s", inet_ntop(AF_INET, &options.ownIp, ip, INET_ADDRSTRLEN)); + } + LOG_INFO(env, " simulateIncomingPacketDrop = %s", options.simulateIncomingPacketDrop); + LOG_INFO(env, " simulateOutgoingPacketDrop = %s", options.simulateOutgoingPacketDrop); + } ShardDB db(logger, shid, dbDir); diff --git a/cpp/shard/Shard.hpp b/cpp/shard/Shard.hpp index 8f5332c4..782b81a4 100644 --- a/cpp/shard/Shard.hpp +++ b/cpp/shard/Shard.hpp @@ -4,7 +4,7 @@ #include "Env.hpp" struct ShardOptions { - LogLevel level = LogLevel::LOG_INFO; + LogLevel logLevel = LogLevel::LOG_INFO; std::string logFile = ""; // if empty, stdout uint16_t port = 0; // automatically assigned, stored in shuckle std::string shuckleHost = ""; diff --git a/cpp/shard/ShardDB.cpp b/cpp/shard/ShardDB.cpp index 47410381..6c2c9bfb 100644 --- a/cpp/shard/ShardDB.cpp +++ b/cpp/shard/ShardDB.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -251,6 +252,7 @@ struct ShardDBImpl { options.create_missing_column_families = true; options.compression = rocksdb::kLZ4Compression; options.bottommost_compression = rocksdb::kZSTD; + rocksdb::ColumnFamilyOptions blockServicesToFilesOptions; blockServicesToFilesOptions.merge_operator = CreateInt64AddOperator(); std::vector familiesDescriptors{ diff --git a/cpp/shard/eggsshard.cpp b/cpp/shard/eggsshard.cpp index 3104d261..011d0a73 100644 --- a/cpp/shard/eggsshard.cpp +++ b/cpp/shard/eggsshard.cpp @@ -100,17 +100,17 @@ int main(int argc, char** argv) { if (arg == "-h" || arg == "-help") { dieWithUsage(); } else if (arg == "-verbose") { - options.level = std::min(LogLevel::LOG_DEBUG, options.level); + options.logLevel = std::min(LogLevel::LOG_DEBUG, options.logLevel); } else if (arg == "-log-level") { std::string logLevel = getNextArg(); if (logLevel == "trace") { - options.level = LogLevel::LOG_TRACE; + options.logLevel = LogLevel::LOG_TRACE; } else if (logLevel == "debug") { - options.level = LogLevel::LOG_DEBUG; + options.logLevel = LogLevel::LOG_DEBUG; } else if (logLevel == "info") { - options.level = LogLevel::LOG_INFO; + options.logLevel = LogLevel::LOG_INFO; } else if (logLevel == "error") { - options.level = LogLevel::LOG_ERROR; + options.logLevel = LogLevel::LOG_ERROR; } else { die("Bad log level `%s'", logLevel.c_str()); } @@ -137,7 +137,7 @@ int main(int argc, char** argv) { } #ifndef EGGS_DEBUG - if (options.level <= LogLevel::LOG_TRACE) { + if (options.logLevel <= LogLevel::LOG_TRACE) { die("Cannot use trace for non-debug builds (it won't work)."); } #endif diff --git a/go/bincodegen/bincodegen.go b/go/bincodegen/bincodegen.go index 37a5cd58..728d350e 100644 --- a/go/bincodegen/bincodegen.go +++ b/go/bincodegen/bincodegen.go @@ -1162,6 +1162,7 @@ func main() { reflect.TypeOf(msgs.SnapshotLookupEdge{}), reflect.TypeOf(msgs.BlockServiceInfo{}), reflect.TypeOf(msgs.ShardInfo{}), + reflect.TypeOf(msgs.RegisterShardInfo{}), } goCode := generateGo(errors, shardReqResps, cdcReqResps, shuckleReqResps, extras) diff --git a/go/eggs/blockservicereq.go b/go/eggs/blockservicereq.go index e2d92e85..abe651f3 100644 --- a/go/eggs/blockservicereq.go +++ b/go/eggs/blockservicereq.go @@ -11,11 +11,7 @@ import ( "xtx/eggsfs/msgs" ) -// TODO connection pool rather than opening a new one each time - -// The first ip1/port1 cannot be zeroed, the second one can. One of them -// will be tried at random. -func BlockServiceConnection(log *Logger, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (*net.TCPConn, error) { +func BlockServiceConnection(log *Logger, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (*net.TCPConn, error) { if port1 == 0 { panic(fmt.Errorf("ip1/port1 must be provided")) } @@ -38,7 +34,7 @@ func BlockServiceConnection(log *Logger, id msgs.BlockServiceId, ip1 [4]byte, po if errs[i%2] == nil { return sock, nil } - log.RaiseAlert(fmt.Errorf("Could not connect to block service %v:%v: %w. Might try other ip/port.", ip, port, errs[i%2])) + log.RaiseAlert(fmt.Errorf("could not connect to block service %v:%v: %w, might try other ip/port", ip, port, errs[i%2])) } // return one of the two errors, we don't want to mess with them too much and they are alerts for _, err := range errs { diff --git a/go/eggs/cdcreq.go b/go/eggs/cdcreq.go index c576750f..3e0759f6 100644 --- a/go/eggs/cdcreq.go +++ b/go/eggs/cdcreq.go @@ -39,15 +39,6 @@ type CDCResponse struct { Body msgs.CDCResponse } -/* -func (req *CDCResponse) Pack(buf *bincode.Buf) { - buf.PackU32(msgs.CDC_RESP_PROTOCOL_VERSION) - buf.PackU64(req.RequestId) - buf.PackU8(uint8(req.Body.CDCResponseKind())) - req.Body.Pack(buf) -} -*/ - type unpackedCDCRequestId uint64 func (requestId *unpackedCDCRequestId) Unpack(r io.Reader) error { @@ -155,7 +146,7 @@ func (c *Client) CDCRequest( return msgs.TIMEOUT } if c.counters != nil { - atomic.AddInt64(&c.counters.CDC.Attempts[msgKind], 1) + atomic.AddUint64(&c.counters.CDC[msgKind].Attempts, 1) } requestId := newRequestId() requestIds[attempts] = requestId @@ -265,8 +256,7 @@ func (c *Client) CDCRequest( // At this point, we know we've got a response elapsed := time.Since(startedAt) if c.counters != nil { - atomic.AddInt64(&c.counters.CDC.Count[msgKind], 1) - atomic.AddInt64(&c.counters.CDC.Nanos[msgKind], elapsed.Nanoseconds()) + c.counters.CDC[msgKind].Timings.Add(elapsed) } // If we're past the first attempt, there are cases where errors are not what they seem. if eggsError != nil && attempts > 0 { diff --git a/go/eggs/client.go b/go/eggs/client.go index 4c172d0c..a173ae39 100644 --- a/go/eggs/client.go +++ b/go/eggs/client.go @@ -4,7 +4,9 @@ import ( "crypto/cipher" "fmt" "net" + "os" "sync" + "time" "xtx/eggsfs/bincode" "xtx/eggsfs/msgs" ) @@ -15,35 +17,35 @@ type shardSocketFactory interface { } type ReqCounters struct { - Count [256]int64 - Attempts [256]int64 - Nanos [256]int64 -} - -func (c *ReqCounters) TotalRequests() int64 { - total := int64(0) - for i := 0; i < 256; i++ { - total += c.Count[i] - } - return total + Timings Timings + Attempts uint64 } type ClientCounters struct { - // these arrays are indexed by req type - Shard ReqCounters - CDC ReqCounters + // these maps are indexed by req type + Shard [256]ReqCounters + CDC [10]ReqCounters +} + +type blockServiceConn struct { + mu sync.Mutex + lastActive time.Time + conn *net.TCPConn } type Client struct { - shardIps [256][4]byte - shardPorts [256]uint16 - shardSocketFactory shardSocketFactory - cdcIp [4]byte - cdcPort uint16 - cdcSocket *net.UDPConn - cdcLock sync.Mutex - counters *ClientCounters - cdcKey cipher.Block + shardIps [256][4]byte + shardPorts [256]uint16 + shardSocketFactory shardSocketFactory + cdcIp [4]byte + cdcPort uint16 + cdcSocket *net.UDPConn + cdcLock sync.Mutex + counters *ClientCounters + cdcKey cipher.Block + blockServicesConns map[string]*blockServiceConn + blockServicesLock sync.RWMutex + blockServicesReaper chan<- bool } // If `shid` is present, the client will only create a socket for that shard, @@ -120,6 +122,37 @@ func NewClientDirect( } c.counters = counters c.cdcKey = cdcKey + c.blockServicesConns = make(map[string]*blockServiceConn) + { + ch := make(chan bool) + c.blockServicesReaper = ch + go func() { + for { + keepGoing := <-ch + if !keepGoing { + break + } + c.blockServicesLock.RLock() + now := time.Now() + for _, conn := range c.blockServicesConns { + if !conn.mu.TryLock() { // somebody else is using this + continue + } + if now.Sub(conn.lastActive) > time.Minute { + if err := conn.conn.Close(); err != nil { + log.RaiseAlert(fmt.Errorf("could not close connection %v", conn.conn.RemoteAddr())) + } + } + conn.mu.Unlock() + } + c.blockServicesLock.RUnlock() + go func() { + time.Sleep(time.Minute) + ch <- true + }() + } + }() + } return c, nil } @@ -163,7 +196,13 @@ func (c *Client) Close() { panic(fmt.Errorf("bad factory %T", c.shardSocketFactory)) } if err := c.cdcSocket.Close(); err != nil { - panic(err) + fmt.Fprintf(os.Stderr, "Could not close CDC conn: %v", err) + } + c.blockServicesReaper <- false + for _, conn := range c.blockServicesConns { + if err := conn.conn.Close(); err != nil { + fmt.Fprintf(os.Stderr, "Could not close block service conn: %v", err) + } } } @@ -179,7 +218,7 @@ func (c *allShardsFactory) close() { continue } if err := sock.Close(); err != nil { - panic(err) + fmt.Fprintf(os.Stderr, "could not close shard socket: %v", err) } } } @@ -221,16 +260,13 @@ type shardSpecificFactory struct { shardLock sync.Mutex } -// TODO probably convert these errors to stderr, we can't do much with them usually -// but they'd be worth knowing about -func (c *shardSpecificFactory) close() error { +func (c *shardSpecificFactory) close() { if c.shardSock == nil { - return nil + return } if err := c.shardSock.Close(); err != nil { - return err + fmt.Fprintf(os.Stderr, "could not close shard sock: %v\n", err) } - return nil } func (c *shardSpecificFactory) getShardSocket(shid msgs.ShardId, ip [4]byte, port uint16) (*net.UDPConn, error) { @@ -351,3 +387,11 @@ func resolveDirectoryInfo( return dirInfoBody, nil } + +func blockServicesConnsKey(ip [4]byte, port uint16) string { + return fmt.Sprintf("%v:%v", net.IP(ip[:]), port) +} + +func blockServicesConnsKeyFromConn(conn *net.TCPConn) string { + return conn.RemoteAddr().String() +} diff --git a/go/eggs/gc.go b/go/eggs/gc.go index 142fe077..7e12d48b 100644 --- a/go/eggs/gc.go +++ b/go/eggs/gc.go @@ -51,12 +51,12 @@ func DestructFile( certifyReq.Proofs = make([]msgs.BlockProof, len(initResp.Blocks)) for i, block := range initResp.Blocks { var proof [8]byte - conn, err := mbs.BlockServiceConnection(log, block.BlockServiceId, block.BlockServiceIp1, block.BlockServicePort1, block.BlockServiceIp2, block.BlockServicePort2) + conn, err := mbs.GetBlockServiceConnection(log, block.BlockServiceIp1, block.BlockServicePort1, block.BlockServiceIp2, block.BlockServicePort2) if err != nil { return err } proof, err = mbs.EraseBlock(log, conn, block) - conn.Close() + mbs.ReleaseBlockServiceConnection(log, conn) if err != nil { return fmt.Errorf("%v: could not erase block %+v: %w", id, block, err) } diff --git a/go/eggs/migrate.go b/go/eggs/migrate.go index 0b9003ad..71406eec 100644 --- a/go/eggs/migrate.go +++ b/go/eggs/migrate.go @@ -68,16 +68,17 @@ func copyBlock( dstBlock := &initiateSpanResp.Blocks[0] var proof [8]byte var err error - srcConn, err := mbs.BlockServiceConnection(log, blockService.Id, blockService.Ip1, blockService.Port1, blockService.Ip2, blockService.Port2) + srcConn, err := mbs.GetBlockServiceConnection(log, blockService.Ip1, blockService.Port1, blockService.Ip2, blockService.Port2) if err != nil { return 0, err } - dstConn, err := mbs.BlockServiceConnection(log, dstBlock.BlockServiceId, dstBlock.BlockServiceIp1, dstBlock.BlockServicePort1, dstBlock.BlockServiceIp2, dstBlock.BlockServicePort2) + defer mbs.ReleaseBlockServiceConnection(log, srcConn) + dstConn, err := mbs.GetBlockServiceConnection(log, dstBlock.BlockServiceIp1, dstBlock.BlockServicePort1, dstBlock.BlockServiceIp2, dstBlock.BlockServicePort2) if err != nil { return 0, err } proof, err = mbs.CopyBlock(log, srcConn, blockService, block.BlockId, block.Crc32, blockSize, dstConn, dstBlock) - dstConn.Close() + defer mbs.ReleaseBlockServiceConnection(log, dstConn) if err != nil { return 0, fmt.Errorf("could not copy block: %w", err) } diff --git a/go/eggs/mockableblockservice.go b/go/eggs/mockableblockservice.go index fb6226aa..e226607f 100644 --- a/go/eggs/mockableblockservice.go +++ b/go/eggs/mockableblockservice.go @@ -4,6 +4,8 @@ import ( "crypto/cipher" "fmt" "io" + "net" + "time" "xtx/eggsfs/msgs" ) @@ -11,11 +13,15 @@ type MockableBlockServiceConn interface { io.Writer io.Reader io.ReaderFrom - io.Closer } type MockableBlockServices interface { - BlockServiceConnection(log *Logger, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (MockableBlockServiceConn, error) + GetBlockServiceConnection( + log *Logger, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16, + ) (MockableBlockServiceConn, error) + ReleaseBlockServiceConnection( + log *Logger, conn MockableBlockServiceConn, + ) error WriteBlock( logger *Logger, conn MockableBlockServiceConn, @@ -50,13 +56,75 @@ type MockableBlockServices interface { ) ([8]byte, error) } -type RealBlockServices struct{} - -func (RealBlockServices) BlockServiceConnection(log *Logger, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (MockableBlockServiceConn, error) { - return BlockServiceConnection(log, id, ip1, port1, ip2, port2) +func (c *Client) existingBlockServiceConnection(ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) *blockServiceConn { + c.blockServicesLock.RLock() + defer c.blockServicesLock.RUnlock() + conn1, found1 := c.blockServicesConns[blockServicesConnsKey(ip1, port1)] + if found1 { + conn1.lastActive = time.Now() + return conn1 + } + conn2, found2 := c.blockServicesConns[blockServicesConnsKey(ip2, port2)] + if found2 { + conn2.lastActive = time.Now() + return conn2 + } + return nil } -func (RealBlockServices) WriteBlock( +func (c *Client) newBlockServiceConnection(log *Logger, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (*blockServiceConn, error) { + conn, err := BlockServiceConnection(log, ip1, port1, ip2, port2) + if err != nil { + return nil, err + } + c.blockServicesLock.Lock() + defer c.blockServicesLock.Unlock() + k := blockServicesConnsKeyFromConn(conn) + otherConn, found := c.blockServicesConns[k] // somebody got here first + if found { + conn.Close() + otherConn.lastActive = time.Now() + return otherConn, nil + } + bsConn := &blockServiceConn{ + conn: conn, + lastActive: time.Now(), + } + c.blockServicesConns[k] = bsConn + return bsConn, nil +} + +// The first ip1/port1 cannot be zeroed, the second one can. One of them +// will be tried at random. +func (c *Client) GetBlockServiceConnection(log *Logger, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (MockableBlockServiceConn, error) { + conn := c.existingBlockServiceConnection(ip1, port1, ip2, port2) + if conn == nil { + var err error + conn, err = c.newBlockServiceConnection(log, ip1, port1, ip2, port2) + if err != nil { + return nil, err + } + } + conn.mu.Lock() + return conn.conn, nil +} + +func (c *Client) ReleaseBlockServiceConnection(log *Logger, conn0 MockableBlockServiceConn) error { + conn := conn0.(*net.TCPConn) + c.blockServicesLock.RLock() + defer c.blockServicesLock.RUnlock() + bsConn, found := c.blockServicesConns[blockServicesConnsKeyFromConn(conn)] + if !found { + panic(fmt.Errorf("could not find connection for %v", conn.RemoteAddr())) + } + // Theoretical race -- the reaper might have closed this conn in the meantime. But amazingly + // unlikely for a minute to pass between getting the conn and here. Note that we can't lock + // inside the map or we'd hog the map lock. + bsConn.mu.Unlock() + return nil +} + +func (c *Client) WriteBlock( logger *Logger, conn MockableBlockServiceConn, block *msgs.BlockInfo, @@ -67,7 +135,7 @@ func (RealBlockServices) WriteBlock( return WriteBlock(logger, conn, block, r, size, crc) } -func (RealBlockServices) FetchBlock( +func (c *Client) FetchBlock( logger *Logger, conn MockableBlockServiceConn, blockService *msgs.BlockService, @@ -79,7 +147,7 @@ func (RealBlockServices) FetchBlock( return FetchBlock(logger, conn, blockService, blockId, blockCrc, offset, count) } -func (RealBlockServices) EraseBlock( +func (c *Client) EraseBlock( logger *Logger, conn MockableBlockServiceConn, block msgs.BlockInfo, @@ -87,7 +155,7 @@ func (RealBlockServices) EraseBlock( return EraseBlock(logger, conn, block) } -func (RealBlockServices) CopyBlock( +func (c *Client) CopyBlock( logger *Logger, sourceConn MockableBlockServiceConn, sourceBlockService *msgs.BlockService, @@ -100,7 +168,7 @@ func (RealBlockServices) CopyBlock( return CopyBlock(logger, sourceConn, sourceBlockService, sourceBlockId, sourceBlockCrc, sourceBlockSize, dstConn, dstBlock) } -var _ = (MockableBlockServices)(RealBlockServices{}) +var _ = (MockableBlockServices)((*Client)(nil)) type MockedBlockServices struct { Keys map[msgs.BlockServiceId]cipher.Block @@ -124,10 +192,14 @@ func (dummyConn) ReadFrom(r io.Reader) (n int64, err error) { return 0, nil } -func (mbs *MockedBlockServices) BlockServiceConnection(log *Logger, id msgs.BlockServiceId, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (MockableBlockServiceConn, error) { +func (mbs *MockedBlockServices) GetBlockServiceConnection(log *Logger, ip1 [4]byte, port1 uint16, ip2 [4]byte, port2 uint16) (MockableBlockServiceConn, error) { return dummyConn{}, nil } +func (mbs *MockedBlockServices) ReleaseBlockServiceConnection(log *Logger, conn MockableBlockServiceConn) error { + return nil +} + func (mbs *MockedBlockServices) WriteBlock( logger *Logger, conn MockableBlockServiceConn, diff --git a/go/eggs/shardreq.go b/go/eggs/shardreq.go index a8513f20..619d66a9 100644 --- a/go/eggs/shardreq.go +++ b/go/eggs/shardreq.go @@ -207,7 +207,7 @@ func (c *Client) ShardRequest( return msgs.TIMEOUT } if c.counters != nil { - atomic.AddInt64(&c.counters.Shard.Attempts[msgKind], 1) + atomic.AddUint64(&c.counters.Shard[msgKind].Attempts, 1) } requestId := newRequestId() requestIds[attempts] = requestId @@ -318,8 +318,7 @@ func (c *Client) ShardRequest( // At this point, we know we've got a response elapsed := time.Since(startedAt) if c.counters != nil { - atomic.AddInt64(&c.counters.Shard.Count[msgKind], 1) - atomic.AddInt64(&c.counters.Shard.Nanos[msgKind], elapsed.Nanoseconds()) + c.counters.Shard[msgKind].Timings.Add(elapsed) } // If we're past the first attempt, there are cases where errors are not what they seem. if eggsError != nil && attempts > 0 { diff --git a/go/eggs/timings.go b/go/eggs/timings.go new file mode 100644 index 00000000..94b0b70d --- /dev/null +++ b/go/eggs/timings.go @@ -0,0 +1,54 @@ +package eggs + +import ( + "math/bits" + "sync/atomic" + "time" +) + +const BUCKETS = 33 + +type Timings struct { + counts [BUCKETS]uint64 + totalNanos uint64 +} + +func (t *Timings) Add(d time.Duration) { + nanos := d.Nanoseconds() + bucket := (64 - bits.LeadingZeros64(uint64(nanos))) - 5 + if bucket < 0 { + bucket = 0 + } + if bucket >= BUCKETS { + bucket = BUCKETS - 1 + } + atomic.AddUint64(&t.totalNanos, uint64(nanos)) + atomic.AddUint64(&t.counts[bucket], 1) +} + +func (t *Timings) Buckets() int { + return BUCKETS +} + +func (t *Timings) Bucket(i int) (lowerBound time.Duration, count uint64, upperBound time.Duration) { + count = atomic.LoadUint64(&t.counts[i]) + upperBound = time.Duration(uint64(1) << (i + 5)) + if i == 0 { + lowerBound = time.Duration(0) + } else { + lowerBound = time.Duration(uint64(1) << (i + 4)) + } + return lowerBound, count, upperBound +} + +func (t *Timings) TotalCount() uint64 { + count := uint64(0) + for i := range t.counts { + count += atomic.LoadUint64(&t.counts[i]) + } + return count +} + +func (t *Timings) TotalTime() time.Duration { + return time.Duration(atomic.LoadUint64(&t.totalNanos)) +} diff --git a/go/eggsblocks/eggsblocks.go b/go/eggsblocks/eggsblocks.go index 3302c715..349257a2 100644 --- a/go/eggsblocks/eggsblocks.go +++ b/go/eggsblocks/eggsblocks.go @@ -576,6 +576,17 @@ func main() { } log := eggs.NewLogger(*verbose, logOut) + log.Info("Running block service with options:") + log.Info(" failureDomain = %v", *failureDomainStr) + log.Info(" noTimeCheck = %v", *noTimeCheck) + log.Info(" ownIp1 = '%v'", *ownIp1Str) + log.Info(" port1 = %v", *port1) + log.Info(" ownIp2 = %v", *ownIp2Str) + log.Info(" port2 = %v", *port2) + log.Info(" verbose = %v", *verbose) + log.Info(" logFile = '%v'", *logFile) + log.Info(" shuckleAddress = '%v'", *shuckleAddress) + blockServices := make(map[msgs.BlockServiceId]blockService) for i := 0; i < flag.NArg(); i += 2 { dir := flag.Args()[i] diff --git a/go/eggsfuse/eggsfuse.go b/go/eggsfuse/eggsfuse.go index b715684c..ed25a2f2 100644 --- a/go/eggsfuse/eggsfuse.go +++ b/go/eggsfuse/eggsfuse.go @@ -6,12 +6,12 @@ import ( "flag" "fmt" "io" - "net" "os" "os/signal" "runtime/pprof" "sync" "syscall" + "time" "xtx/eggsfs/eggs" "xtx/eggsfs/msgs" @@ -296,7 +296,7 @@ func (n *eggsNode) Mkdir( } func (n *eggsNode) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrOut) syscall.Errno { - log.Debug("getattr inode=%v, name=%v", n.id) + log.Debug("getattr inode=%v", n.id) out.Ino = uint64(n.id) out.Mode = inodeTypeToMode(n.id.Type()) @@ -371,12 +371,12 @@ func (f *transientFile) writeSpan(policy *msgs.SpanPolicy) syscall.Errno { Proofs: make([]msgs.BlockProof, len(initiateResp.Blocks)), } for i, block := range initiateResp.Blocks { - conn, err := eggs.BlockServiceConnection(log, block.BlockServiceId, block.BlockServiceIp1, block.BlockServicePort1, block.BlockServiceIp2, block.BlockServicePort2) + conn, err := client.GetBlockServiceConnection(log, block.BlockServiceIp1, block.BlockServicePort1, block.BlockServiceIp2, block.BlockServicePort2) if err != nil { return syscall.EIO } proof, err := eggs.WriteBlock(log, conn, &block, bytes.NewReader(data), uint32(len(data)), crc) - conn.Close() + client.ReleaseBlockServiceConnection(log, conn) if err != nil { log.Error("writing block failed with error %v", err) return syscall.EIO @@ -547,10 +547,8 @@ type openFile struct { // We keep the latest span/block service around, we might just need // the next bit in sequential reads. - spans msgs.FileSpansResp - currentSpanIx int // index in list above - currentBlockService *msgs.BlockService - currentBlockServiceConn *net.TCPConn + spans msgs.FileSpansResp + currentSpanIx int // index in list above } func (n *eggsNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno) { @@ -570,34 +568,6 @@ func (n *eggsNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fu return &of, 0, 0 } -func (of *openFile) resetBlockService() syscall.Errno { - if of.currentBlockService != nil { - if err := of.currentBlockServiceConn.Close(); err != nil { - panic(err) - } - } - of.currentBlockService = nil - of.currentBlockServiceConn = nil - return 0 -} - -func (of *openFile) ensureBlockService(bs *msgs.BlockService) syscall.Errno { - if of.currentBlockService == nil || of.currentBlockService.Id != bs.Id { - if of.currentBlockService != nil { - if err := of.currentBlockServiceConn.Close(); err != nil { - panic(err) - } - } - conn, err := eggs.BlockServiceConnection(log, bs.Id, bs.Ip1, bs.Port1, bs.Ip2, bs.Port2) - if err != nil { - panic(err) - } - of.currentBlockServiceConn = conn - of.currentBlockService = bs - } - return 0 -} - // One step of reading, will go through at most one span. func (of *openFile) readInternal(dest []byte, off int64) (int64, syscall.Errno) { // Check if we're still within the file: if not, we can just exit @@ -647,27 +617,27 @@ func (of *openFile) readInternal(dest []byte, off int64) (int64, syscall.Errno) } if currentSpan.StorageClass == msgs.INLINE_STORAGE { - if err := of.resetBlockService(); err != 0 { - return 0, err - } return int64(copy(dest, currentSpan.BodyBytes[spanOffset:])), 0 } else { if currentSpan.Parity.DataBlocks() != 1 { panic(fmt.Errorf("unsupported parity %v", currentSpan.Parity)) } block := ¤tSpan.BodyBlocks[0] - if err := of.ensureBlockService(&of.spans.BlockServices[block.BlockServiceIx]); err != 0 { - return 0, err - } + blockService := &of.spans.BlockServices[block.BlockServiceIx] toRead := uint32(remainingInSpan) if len(dest) < int(toRead) { toRead = uint32(len(dest)) } - if err := eggs.FetchBlock(log, of.currentBlockServiceConn, of.currentBlockService, block.BlockId, block.Crc32, uint32(spanOffset), toRead); err != nil { + conn, err := client.GetBlockServiceConnection(log, blockService.Ip1, blockService.Port1, blockService.Ip2, blockService.Port2) + if err != nil { + panic(err) + } + defer client.ReleaseBlockServiceConnection(log, conn) + if err := eggs.FetchBlock(log, conn, blockService, block.BlockId, block.Crc32, uint32(spanOffset), toRead); err != nil { panic(err) } dest = dest[:toRead] - if _, err := io.ReadFull(of.currentBlockServiceConn, dest); err != nil { + if _, err := io.ReadFull(conn, dest); err != nil { panic(err) } return int64(len(dest)), 0 @@ -695,16 +665,8 @@ func (of *openFile) Read(ctx context.Context, dest []byte, off int64) (fuse.Read return fuse.ReadResultData(dest[:internalOff]), 0 } -func (of *openFile) Flush(ctx context.Context) syscall.Errno { - log.Debug("flush file=%v", of.id) - - of.mu.Lock() - defer of.mu.Unlock() - return of.resetBlockService() -} - func (n *eggsNode) Unlink(ctx context.Context, name string) syscall.Errno { - log.Debug("unlink dir=%v, name=%v", n, n.id, name) + log.Debug("unlink dir=%v, name=%v", n.id, name) lookupResp := msgs.LookupResp{} if err := shardRequest(n.id.Shard(), &msgs.LookupReq{DirId: n.id, Name: name}, &lookupResp); err != 0 { @@ -720,7 +682,7 @@ func (n *eggsNode) Unlink(ctx context.Context, name string) syscall.Errno { } func (n *eggsNode) Rmdir(ctx context.Context, name string) syscall.Errno { - log.Debug("rmdir dir=%v, name=%v", n, n.id, name) + log.Debug("rmdir dir=%v, name=%v", n.id, name) lookupResp := msgs.LookupResp{} if err := shardRequest(n.id.Shard(), &msgs.LookupReq{DirId: n.id, Name: name}, &lookupResp); err != 0 { return err @@ -769,11 +731,11 @@ func (n *eggsNode) Readlink(ctx context.Context) ([]byte, syscall.Errno) { } block := span.BodyBlocks[0] blockService := resp.BlockServices[block.BlockServiceIx] - conn, err := eggs.BlockServiceConnection(log, blockService.Id, blockService.Ip1, blockService.Port1, blockService.Ip2, blockService.Port2) + conn, err := client.GetBlockServiceConnection(log, blockService.Ip1, blockService.Port1, blockService.Ip2, blockService.Port2) if err != nil { panic(err) } - defer conn.Close() + defer client.ReleaseBlockServiceConnection(log, conn) if err := eggs.FetchBlock(log, conn, &blockService, block.BlockId, block.Crc32, 0, uint32(span.BlockSize)); err != nil { panic(err) } @@ -803,7 +765,6 @@ var _ = (fs.FileWriter)((*transientFile)(nil)) var _ = (fs.FileFlusher)((*transientFile)(nil)) var _ = (fs.FileReader)((*openFile)(nil)) -var _ = (fs.FileFlusher)((*openFile)(nil)) func terminate(server *fuse.Server, terminated *bool) { log.Info("terminating") @@ -867,8 +828,10 @@ func main() { pprof.StartCPUProfile(f) // we stop in terminate() } + counters := &eggs.ClientCounters{} + var err error - client, err = eggs.NewClient(log, *shuckleAddress, nil, nil, nil) + client, err = eggs.NewClient(log, *shuckleAddress, nil, counters, nil) if err != nil { panic(err) } @@ -893,6 +856,74 @@ func main() { } } + // print out stats when sent USR1 + { + statsChan := make(chan os.Signal, 1) + signal.Notify(statsChan, syscall.SIGUSR1) + go func() { + for { + <-statsChan + formatCounters := func(c *eggs.ReqCounters) { + totalCount := uint64(0) + for i := 0; i < c.Timings.Buckets(); i++ { + _, count, _ := c.Timings.Bucket(i) + totalCount += count + } + log.Info(" count: %v", totalCount) + log.Info(" attempts: %v (%v)", c.Attempts, float64(c.Attempts)/float64(totalCount)) + log.Info(" total time: %v", time.Duration(c.Timings.TotalTime())) + log.Info(" avg time: %v", time.Duration(uint64(c.Timings.TotalTime())/totalCount)) + hist := bytes.NewBuffer([]byte{}) + first := true + countSoFar := uint64(0) + for i := 0; i < c.Timings.Buckets(); i++ { + lowerBound, count, upperBound := c.Timings.Bucket(i) + if count == 0 { + continue + } + countSoFar += count + if first { + fmt.Fprintf(hist, "%v < ", lowerBound) + } else { + fmt.Fprintf(hist, ", ") + } + first = false + fmt.Fprintf(hist, "%v (%0.2f%%) < %v", count, float64(countSoFar*100)/float64(totalCount), upperBound) + } + log.Info(" hist: %v", hist.String()) + } + var shardTime time.Duration + for i := 0; i < len(counters.Shard[:]); i++ { + shardTime += counters.Shard[i].Timings.TotalTime() + } + log.Info("Shard stats (total shard time %v):", shardTime) + for i := 0; i < len(counters.Shard[:]); i++ { + c := &counters.Shard[i] + if c.Attempts == 0 { + continue + } + kind := msgs.ShardMessageKind(i) + log.Info(" %v", kind) + formatCounters(c) + } + var cdcTime time.Duration + for i := 0; i < len(counters.CDC[:]); i++ { + cdcTime += counters.CDC[i].Timings.TotalTime() + } + log.Info("CDC stats (total CDC time %v):", cdcTime) + for i := 0; i < len(counters.CDC[:]); i++ { + c := &counters.CDC[i] + if c.Attempts == 0 { + continue + } + kind := msgs.CDCMessageKind(i) + log.Info(" %v", kind) + formatCounters(c) + } + } + }() + } + terminated := false defer terminate(server, &terminated) // Cleanup if we get killed with a signal. Obviously we can't do much diff --git a/go/eggsrun/eggsrun.go b/go/eggsrun/eggsrun.go index a834e7cd..b35b71ba 100644 --- a/go/eggsrun/eggsrun.go +++ b/go/eggsrun/eggsrun.go @@ -21,7 +21,7 @@ func noRunawayArgs() { func main() { buildType := flag.String("build-type", "alpine", "C++ build type, one of alpine/release/debug/sanitized/valgrind.") - verbose := flag.Bool("verbose", false, "Note that verbose won't do much for the shard unless you build with debug.") + verbose := flag.Bool("verbose", false, "") dataDir := flag.String("data-dir", "", "Directory where to store the EggsFS data. If not present a temporary directory will be used.") hddBlockServices := flag.Uint("hdd-block-services", 10, "Number of HDD block services (default 0).") flashBlockServices := flag.Uint("flash-block-services", 5, "Number of HDD block services (default 0).") @@ -47,10 +47,6 @@ func main() { validPort(*shuckleHttpPort) validPort(*startingPort) - if *verbose && *buildType != "debug" { - fmt.Printf("We're building with build type %v, which is not \"debug\", and you also passed in -verbose.\nBe aware that you won't get debug messages for C++ binaries.\n\n", *buildType) - } - if *dataDir == "" { dir, err := os.MkdirTemp("", "eggsrun.") if err != nil { @@ -127,7 +123,7 @@ func main() { opts := eggs.CDCOpts{ Exe: cppExes.CDCExe, Dir: path.Join(*dataDir, "cdc"), - Verbose: *verbose && *buildType == "debug", + Verbose: *verbose, Valgrind: *buildType == "valgrind", ShuckleAddress: shuckleAddress, OwnIp: *ownIp, @@ -145,7 +141,7 @@ func main() { opts := eggs.ShardOpts{ Exe: cppExes.ShardExe, Dir: path.Join(*dataDir, fmt.Sprintf("shard_%03d", i)), - Verbose: *verbose && *buildType == "debug", + Verbose: *verbose, Shid: shid, Valgrind: *buildType == "valgrind", ShuckleAddress: shuckleAddress, diff --git a/go/eggsshuckle/base.html b/go/eggsshuckle/base.html index 6a0dc264..cabfcf98 100644 --- a/go/eggsshuckle/base.html +++ b/go/eggsshuckle/base.html @@ -3,7 +3,7 @@ - +