diff --git a/go/eggscli/eggscli.go b/go/eggscli/eggscli.go index 35182ddb..c08ec434 100644 --- a/go/eggscli/eggscli.go +++ b/go/eggscli/eggscli.go @@ -52,6 +52,7 @@ func outputFullFileSizes(log *lib.Logger, client *lib.Client) { err := lib.Parwalk( log, client, + "/", func(parent msgs.InodeId, id msgs.InodeId, path string) error { if id.Type() == msgs.DIRECTORY { if atomic.AddUint64(&examinedDirs, 1)%1000000 == 0 { @@ -146,6 +147,26 @@ func outputBriefFileSizes(log *lib.Logger, client *lib.Client) { } } +func formatSize(bytes uint64) string { + bytesf := float64(bytes) + if bytes == 0 { + return "0" + } + if bytes < 1e6 { + return fmt.Sprintf("%.2fKB", bytesf/1e3) + } + if bytes < 1e9 { + return fmt.Sprintf("%.2fMB", bytesf/1e6) + } + if bytes < 1e12 { + return fmt.Sprintf("%.2fGB", bytesf/1e9) + } + if bytes < 1e15 { + return fmt.Sprintf("%.2fTB", bytesf/1e12) + } + return fmt.Sprintf("%.2fPB", bytesf/1e15) +} + func main() { flag.Usage = usage shuckleAddress := flag.String("shuckle", lib.DEFAULT_SHUCKLE_ADDRESS, "Shuckle address (host:port).") @@ -697,6 +718,7 @@ func main() { err := lib.Parwalk( log, client, + "/", func(parent, id msgs.InodeId, path string) error { if id.Type() == msgs.DIRECTORY { return nil @@ -748,7 +770,6 @@ func main() { } countFilesCmd := flag.NewFlagSet("count-files", flag.ExitOnError) - // countFilesTransient := countFilesCmd.Bool("transient", false, "") countFilesRun := func() { var wg sync.WaitGroup ch := make(chan any) @@ -793,6 +814,43 @@ func main() { run: countFilesRun, } + duCmd := flag.NewFlagSet("du", flag.ExitOnError) + duDir := duCmd.String("path", "/", "") + duRun := func() { + var numFiles uint64 + var numDirectories uint64 + var totalSize uint64 + startedAt := time.Now() + err := lib.Parwalk( + log, + client, + *duDir, + func(parent, id msgs.InodeId, path string) error { + if id.Type() == msgs.DIRECTORY { + atomic.AddUint64(&numDirectories, 1) + return nil + } + resp := msgs.StatFileResp{} + if err := client.ShardRequest(log, id.Shard(), &msgs.StatFileReq{Id: id}, &resp); err != nil { + return err + } + atomic.AddUint64(&totalSize, resp.Size) + if atomic.AddUint64(&numFiles, 1)%uint64(1_000_000) == 0 { + log.Info("went through %v files (%v, %0.2f files/s), %v directories", numFiles, formatSize(totalSize), float64(numFiles)/float64(time.Since(startedAt).Seconds()), numDirectories) + } + return nil + }, + ) + if err != nil { + panic(err) + } + log.Info("total size: %v (%v bytes), in %v files and %v directories", formatSize(totalSize), totalSize, numFiles, numDirectories) + } + commands["du"] = commandSpec{ + flags: duCmd, + run: duRun, + } + flag.Parse() if flag.NArg() < 1 { diff --git a/go/lib/client.go b/go/lib/client.go index ba6bd0bb..450b4658 100644 --- a/go/lib/client.go +++ b/go/lib/client.go @@ -533,7 +533,7 @@ func (cm *clientMetadata) init(log *Logger) error { read, _, err := cm.sock.ReadFrom(respBuf) if err != nil { if cm.sockClosed { - log.Debug("got error while reading from metadata socket when winded down: %v", err) + log.Debug("got error while reading from metadata socket when wound down: %v", err) } else { log.RaiseAlert("got error while reading from metadata socket: %v", err) } @@ -783,10 +783,11 @@ TraverseDirectories: return inheritedFrom, nil } -func (client *Client) ResolvePath(log *Logger, path string) (msgs.InodeId, error) { +func (client *Client) ResolvePathWithParent(log *Logger, path string) (msgs.InodeId, msgs.InodeId, error) { if !filepath.IsAbs(path) { - return msgs.NULL_INODE_ID, fmt.Errorf("expected absolute path, got '%v'", path) + return msgs.NULL_INODE_ID, msgs.NULL_INODE_ID, fmt.Errorf("expected absolute path, got '%v'", path) } + parent := msgs.NULL_INODE_ID id := msgs.ROOT_DIR_INODE_ID for _, segment := range strings.Split(filepath.Clean(path), "/")[1:] { if segment == "" { @@ -794,11 +795,17 @@ func (client *Client) ResolvePath(log *Logger, path string) (msgs.InodeId, error } resp := msgs.LookupResp{} if err := client.ShardRequest(log, id.Shard(), &msgs.LookupReq{DirId: id, Name: segment}, &resp); err != nil { - return msgs.NULL_INODE_ID, err + return msgs.NULL_INODE_ID, msgs.NULL_INODE_ID, err } + parent = id id = resp.TargetId } - return id, nil + return id, parent, nil +} + +func (client *Client) ResolvePath(log *Logger, path string) (msgs.InodeId, error) { + id, _, err := client.ResolvePathWithParent(log, path) + return id, err } type WriteBlockFuture struct { diff --git a/go/lib/parwalk.go b/go/lib/parwalk.go index 982c4dd6..b6e5877b 100644 --- a/go/lib/parwalk.go +++ b/go/lib/parwalk.go @@ -29,6 +29,7 @@ func (env *parwalkEnv) visit( id msgs.InodeId, path string, ) error { + log.Debug("visiting %q, %v", path, id) if err := env.callback(parent, id, path); err != nil { return err } @@ -43,17 +44,6 @@ func (env *parwalkEnv) visit( path: path, } env.chans[id.Shard()] <- req - /* - for { - select { - case env.chans[id.Shard()] <- req: - goto Sent - default: - log.Info("could not send message to shard %v, will retry", id.Shard()) - time.Sleep(time.Second) - } - } - */ } return nil } @@ -94,6 +84,7 @@ func (env *parwalkEnv) process( func Parwalk( log *Logger, client *Client, + root string, callback func(parent msgs.InodeId, id msgs.InodeId, path string) error, ) error { // compute @@ -124,7 +115,11 @@ func Parwalk( } }() } - if err := env.visit(log, msgs.NULL_INODE_ID, msgs.ROOT_DIR_INODE_ID, ""); err != nil { + rootId, parentId, err := client.ResolvePathWithParent(log, root) + if err != nil { + return err + } + if err := env.visit(log, parentId, rootId, root); err != nil { return err } env.wg.Wait()