Add du command to eggscli

This commit is contained in:
Francesco Mazzoli
2023-10-25 13:58:14 +00:00
committed by Francesco Mazzoli
parent 52909c18b6
commit bf674a9a23
3 changed files with 78 additions and 18 deletions

View File

@@ -52,6 +52,7 @@ func outputFullFileSizes(log *lib.Logger, client *lib.Client) {
err := lib.Parwalk(
log,
client,
"/",
func(parent msgs.InodeId, id msgs.InodeId, path string) error {
if id.Type() == msgs.DIRECTORY {
if atomic.AddUint64(&examinedDirs, 1)%1000000 == 0 {
@@ -146,6 +147,26 @@ func outputBriefFileSizes(log *lib.Logger, client *lib.Client) {
}
}
func formatSize(bytes uint64) string {
bytesf := float64(bytes)
if bytes == 0 {
return "0"
}
if bytes < 1e6 {
return fmt.Sprintf("%.2fKB", bytesf/1e3)
}
if bytes < 1e9 {
return fmt.Sprintf("%.2fMB", bytesf/1e6)
}
if bytes < 1e12 {
return fmt.Sprintf("%.2fGB", bytesf/1e9)
}
if bytes < 1e15 {
return fmt.Sprintf("%.2fTB", bytesf/1e12)
}
return fmt.Sprintf("%.2fPB", bytesf/1e15)
}
func main() {
flag.Usage = usage
shuckleAddress := flag.String("shuckle", lib.DEFAULT_SHUCKLE_ADDRESS, "Shuckle address (host:port).")
@@ -697,6 +718,7 @@ func main() {
err := lib.Parwalk(
log,
client,
"/",
func(parent, id msgs.InodeId, path string) error {
if id.Type() == msgs.DIRECTORY {
return nil
@@ -748,7 +770,6 @@ func main() {
}
countFilesCmd := flag.NewFlagSet("count-files", flag.ExitOnError)
// countFilesTransient := countFilesCmd.Bool("transient", false, "")
countFilesRun := func() {
var wg sync.WaitGroup
ch := make(chan any)
@@ -793,6 +814,43 @@ func main() {
run: countFilesRun,
}
duCmd := flag.NewFlagSet("du", flag.ExitOnError)
duDir := duCmd.String("path", "/", "")
duRun := func() {
var numFiles uint64
var numDirectories uint64
var totalSize uint64
startedAt := time.Now()
err := lib.Parwalk(
log,
client,
*duDir,
func(parent, id msgs.InodeId, path string) error {
if id.Type() == msgs.DIRECTORY {
atomic.AddUint64(&numDirectories, 1)
return nil
}
resp := msgs.StatFileResp{}
if err := client.ShardRequest(log, id.Shard(), &msgs.StatFileReq{Id: id}, &resp); err != nil {
return err
}
atomic.AddUint64(&totalSize, resp.Size)
if atomic.AddUint64(&numFiles, 1)%uint64(1_000_000) == 0 {
log.Info("went through %v files (%v, %0.2f files/s), %v directories", numFiles, formatSize(totalSize), float64(numFiles)/float64(time.Since(startedAt).Seconds()), numDirectories)
}
return nil
},
)
if err != nil {
panic(err)
}
log.Info("total size: %v (%v bytes), in %v files and %v directories", formatSize(totalSize), totalSize, numFiles, numDirectories)
}
commands["du"] = commandSpec{
flags: duCmd,
run: duRun,
}
flag.Parse()
if flag.NArg() < 1 {

View File

@@ -533,7 +533,7 @@ func (cm *clientMetadata) init(log *Logger) error {
read, _, err := cm.sock.ReadFrom(respBuf)
if err != nil {
if cm.sockClosed {
log.Debug("got error while reading from metadata socket when winded down: %v", err)
log.Debug("got error while reading from metadata socket when wound down: %v", err)
} else {
log.RaiseAlert("got error while reading from metadata socket: %v", err)
}
@@ -783,10 +783,11 @@ TraverseDirectories:
return inheritedFrom, nil
}
func (client *Client) ResolvePath(log *Logger, path string) (msgs.InodeId, error) {
func (client *Client) ResolvePathWithParent(log *Logger, path string) (msgs.InodeId, msgs.InodeId, error) {
if !filepath.IsAbs(path) {
return msgs.NULL_INODE_ID, fmt.Errorf("expected absolute path, got '%v'", path)
return msgs.NULL_INODE_ID, msgs.NULL_INODE_ID, fmt.Errorf("expected absolute path, got '%v'", path)
}
parent := msgs.NULL_INODE_ID
id := msgs.ROOT_DIR_INODE_ID
for _, segment := range strings.Split(filepath.Clean(path), "/")[1:] {
if segment == "" {
@@ -794,11 +795,17 @@ func (client *Client) ResolvePath(log *Logger, path string) (msgs.InodeId, error
}
resp := msgs.LookupResp{}
if err := client.ShardRequest(log, id.Shard(), &msgs.LookupReq{DirId: id, Name: segment}, &resp); err != nil {
return msgs.NULL_INODE_ID, err
return msgs.NULL_INODE_ID, msgs.NULL_INODE_ID, err
}
parent = id
id = resp.TargetId
}
return id, nil
return id, parent, nil
}
func (client *Client) ResolvePath(log *Logger, path string) (msgs.InodeId, error) {
id, _, err := client.ResolvePathWithParent(log, path)
return id, err
}
type WriteBlockFuture struct {

View File

@@ -29,6 +29,7 @@ func (env *parwalkEnv) visit(
id msgs.InodeId,
path string,
) error {
log.Debug("visiting %q, %v", path, id)
if err := env.callback(parent, id, path); err != nil {
return err
}
@@ -43,17 +44,6 @@ func (env *parwalkEnv) visit(
path: path,
}
env.chans[id.Shard()] <- req
/*
for {
select {
case env.chans[id.Shard()] <- req:
goto Sent
default:
log.Info("could not send message to shard %v, will retry", id.Shard())
time.Sleep(time.Second)
}
}
*/
}
return nil
}
@@ -94,6 +84,7 @@ func (env *parwalkEnv) process(
func Parwalk(
log *Logger,
client *Client,
root string,
callback func(parent msgs.InodeId, id msgs.InodeId, path string) error,
) error {
// compute
@@ -124,7 +115,11 @@ func Parwalk(
}
}()
}
if err := env.visit(log, msgs.NULL_INODE_ID, msgs.ROOT_DIR_INODE_ID, ""); err != nil {
rootId, parentId, err := client.ResolvePathWithParent(log, root)
if err != nil {
return err
}
if err := env.visit(log, parentId, rootId, root); err != nil {
return err
}
env.wg.Wait()