From e96921708eb5cc92c4023766dadb30813504dc56 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Fri, 31 May 2024 16:35:18 +0200 Subject: [PATCH] feat(ocis): concurrency for consistency command Signed-off-by: jkoberg --- ocis/pkg/backup/backup.go | 210 +++++++++++------------------------- ocis/pkg/backup/provider.go | 181 +++++++++++++++++++++++++++++++ 2 files changed, 245 insertions(+), 146 deletions(-) create mode 100644 ocis/pkg/backup/provider.go diff --git a/ocis/pkg/backup/backup.go b/ocis/pkg/backup/backup.go index 946d07640a..8ca84fbbdb 100644 --- a/ocis/pkg/backup/backup.go +++ b/ocis/pkg/backup/backup.go @@ -2,16 +2,11 @@ package backup import ( - "errors" "fmt" "io/fs" "os" - "path/filepath" "regexp" "strings" - - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" - "github.com/shamaton/msgpack/v2" ) // Inconsistency describes the type of inconsistency @@ -40,13 +35,7 @@ var ( _trashRegex = regexp.MustCompile(`\.T\.[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]+Z$`) ) -// ListBlobstore required to check blob consistency -type ListBlobstore interface { - List() ([]*node.Node, error) - Path(node *node.Node) string -} - -// Consistency holds the node and blob data of a space +// Consistency holds the node and blob data of a storage provider type Consistency struct { // Storing the data like this might take a lot of memory // we might need to optimize this if we run into memory issues @@ -57,14 +46,10 @@ type Consistency struct { nodeToLink map[string]string blobToNode map[string]string - - fsys fs.FS - discpath string - lbs ListBlobstore } -// New creates a new Consistency object -func New(fsys fs.FS, discpath string, lbs ListBlobstore) *Consistency { +// NewConsistency creates a new Consistency object +func NewConsistency() *Consistency { return &Consistency{ Nodes: make(map[string][]Inconsistency), LinkedNodes: make(map[string][]Inconsistency), @@ -73,10 +58,6 @@ func New(fsys fs.FS, discpath string, lbs ListBlobstore) *Consistency { nodeToLink: make(map[string]string), blobToNode: make(map[string]string), - - fsys: fsys, - discpath: discpath, - lbs: lbs, } } @@ -84,128 +65,86 @@ func New(fsys fs.FS, discpath string, lbs ListBlobstore) *Consistency { func CheckProviderConsistency(storagepath string, lbs ListBlobstore) error { fsys := os.DirFS(storagepath) - c := New(fsys, storagepath, lbs) - if err := c.Initialize(); err != nil { - return err - } - - if err := c.Evaluate(); err != nil { - return err - } - - return c.PrintResults() -} - -// Initialize initializes the Consistency object -func (c *Consistency) Initialize() error { - dirs, err := fs.Glob(c.fsys, "spaces/*/*/nodes/*/*/*/*") + nodes, links, blobs, quit, err := NewProvider(fsys, storagepath, lbs).ProduceData() if err != nil { return err } - if len(dirs) == 0 { - return errors.New("no backup found. Double check storage path") - } + c := NewConsistency() + c.GatherData(nodes, links, blobs, quit) - for _, d := range dirs { - entries, err := fs.ReadDir(c.fsys, d) - if err != nil { - return err - } - - if len(entries) == 0 { - fmt.Println("empty dir", filepath.Join(c.discpath, d)) - continue - } - - for _, e := range entries { - switch { - case e.IsDir(): - ls, err := fs.ReadDir(c.fsys, filepath.Join(d, e.Name())) - if err != nil { - fmt.Println("error reading dir", err) - continue - } - for _, l := range ls { - linkpath := filepath.Join(c.discpath, d, e.Name(), l.Name()) - - r, _ := os.Readlink(linkpath) - nodePath := filepath.Join(c.discpath, d, e.Name(), r) - c.LinkedNodes[nodePath] = []Inconsistency{} - c.nodeToLink[nodePath] = linkpath - } - fallthrough - case filepath.Ext(e.Name()) == "" || _versionRegex.MatchString(e.Name()) || _trashRegex.MatchString(e.Name()): - if !c.filesExist(filepath.Join(d, e.Name())) { - dp := filepath.Join(c.discpath, d, e.Name()) - c.Nodes[dp] = append(c.Nodes[dp], InconsistencyFilesMissing) - } - inc := c.checkNode(filepath.Join(d, e.Name())) - dp := filepath.Join(c.discpath, d, e.Name()) - if inc != "" { - c.Nodes[dp] = append(c.Nodes[dp], inc) - } else if len(c.Nodes[dp]) == 0 { - c.Nodes[dp] = []Inconsistency{} - } - } - } - } - - links, err := fs.Glob(c.fsys, "spaces/*/*/trash/*/*/*/*/*") - if err != nil { - return err - } - for _, l := range links { - linkpath := filepath.Join(c.discpath, l) - r, _ := os.Readlink(linkpath) - p := filepath.Join(c.discpath, l, "..", r) - c.LinkedNodes[p] = []Inconsistency{} - c.nodeToLink[p] = linkpath - } - return nil + return c.PrintResults(storagepath) } -// Evaluate evaluates inconsistencies -func (c *Consistency) Evaluate() error { +// GatherData gathers and evaluates data produced by the DataProvider +func (c *Consistency) GatherData(nodes chan NodeData, links chan LinkData, blobs chan BlobData, quit chan struct{}) { + c.gatherData(nodes, links, blobs, quit) + for n := range c.Nodes { - if _, ok := c.LinkedNodes[n]; !ok && c.requiresSymlink(n) { + if len(c.Nodes[n]) == 0 { c.Nodes[n] = append(c.Nodes[n], InconsistencySymlinkMissing) - continue } - - deleteInconsistency(c.LinkedNodes, n) - deleteInconsistency(c.Nodes, n) } - - // LinkedNodes should be empty now for l := range c.LinkedNodes { c.LinkedNodes[l] = append(c.LinkedNodes[l], InconsistencyNodeMissing) } - - blobs, err := c.lbs.List() - if err != nil { - return err + for b := range c.Blobs { + c.Blobs[b] = append(c.Blobs[b], InconsistencyBlobOrphaned) } - - for _, bn := range blobs { - p := c.lbs.Path(bn) - if _, ok := c.BlobReferences[p]; !ok { - c.Blobs[p] = append(c.Blobs[p], InconsistencyBlobOrphaned) - continue - } - deleteInconsistency(c.BlobReferences, p) - } - - // BlobReferences should be empty now for b := range c.BlobReferences { c.BlobReferences[b] = append(c.BlobReferences[b], InconsistencyBlobMissing) } +} + +func (c *Consistency) gatherData(nodes chan NodeData, links chan LinkData, blobs chan BlobData, quit chan struct{}) { + for { + select { + case n := <-nodes: + // does it have inconsistencies? + if len(n.Inconsistencies) != 0 { + c.Nodes[n.NodePath] = append(c.Nodes[n.NodePath], n.Inconsistencies...) + } + // is it linked? + if _, ok := c.LinkedNodes[n.NodePath]; ok { + deleteInconsistency(c.LinkedNodes, n.NodePath) + deleteInconsistency(c.Nodes, n.NodePath) + } else if requiresSymlink(n.NodePath) { + c.Nodes[n.NodePath] = c.Nodes[n.NodePath] + } + // does it have a blob? + if n.BlobPath != "" { + if _, ok := c.Blobs[n.BlobPath]; ok { + deleteInconsistency(c.Blobs, n.BlobPath) + } else { + c.BlobReferences[n.BlobPath] = []Inconsistency{} + c.blobToNode[n.BlobPath] = n.NodePath + } + } + case l := <-links: + // does it have a node? + if _, ok := c.Nodes[l.NodePath]; ok { + deleteInconsistency(c.Nodes, l.NodePath) + } else { + c.LinkedNodes[l.NodePath] = []Inconsistency{} + c.nodeToLink[l.NodePath] = l.LinkPath + } + case b := <-blobs: + // does it have a reference? + if _, ok := c.BlobReferences[b.BlobPath]; ok { + deleteInconsistency(c.BlobReferences, b.BlobPath) + } else { + c.Blobs[b.BlobPath] = []Inconsistency{} + } + case <-quit: + return + + } + } - return nil } // PrintResults prints the results of the evaluation -func (c *Consistency) PrintResults() error { +func (c *Consistency) PrintResults(discpath string) error { if len(c.Nodes) != 0 { fmt.Println("\n🚨 Inconsistent Nodes:") } @@ -231,34 +170,13 @@ func (c *Consistency) PrintResults() error { fmt.Printf("\tšŸ‘‰ļø %v\tblob: %s\n\t\t\t\treferencing node:%s\n", c.BlobReferences[b], b, c.blobToNode[b]) } if len(c.Nodes) == 0 && len(c.LinkedNodes) == 0 && len(c.Blobs) == 0 && len(c.BlobReferences) == 0 { - fmt.Printf("šŸ’š No inconsistency found. The backup in '%s' seems to be valid.\n", c.discpath) + fmt.Printf("šŸ’š No inconsistency found. The backup in '%s' seems to be valid.\n", discpath) } return nil } -func (c *Consistency) checkNode(path string) Inconsistency { - b, err := fs.ReadFile(c.fsys, path+".mpk") - if err != nil { - return InconsistencyFilesMissing - } - - m := map[string][]byte{} - if err := msgpack.Unmarshal(b, &m); err != nil { - return InconsistencyMalformedFile - } - - if bid := m["user.ocis.blobid"]; string(bid) != "" { - spaceID, _ := getIDsFromPath(filepath.Join(c.discpath, path)) - p := c.lbs.Path(&node.Node{BlobID: string(bid), SpaceID: spaceID}) - c.BlobReferences[p] = []Inconsistency{} - c.blobToNode[p] = filepath.Join(c.discpath, path) - } - - return "" -} - -func (c *Consistency) requiresSymlink(path string) bool { +func requiresSymlink(path string) bool { spaceID, nodeID := getIDsFromPath(path) if nodeID != "" && spaceID != "" && (spaceID == nodeID || _versionRegex.MatchString(nodeID)) { return false @@ -267,7 +185,7 @@ func (c *Consistency) requiresSymlink(path string) bool { return true } -func (c *Consistency) filesExist(path string) bool { +func (c *DataProvider) filesExist(path string) bool { check := func(p string) bool { _, err := fs.Stat(c.fsys, p) return err == nil diff --git a/ocis/pkg/backup/provider.go b/ocis/pkg/backup/provider.go new file mode 100644 index 0000000000..917be0ad27 --- /dev/null +++ b/ocis/pkg/backup/provider.go @@ -0,0 +1,181 @@ +package backup + +import ( + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "sync" + + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/shamaton/msgpack/v2" +) + +// ListBlobstore required to check blob consistency +type ListBlobstore interface { + List() ([]*node.Node, error) + Path(node *node.Node) string +} + +// DataProvider provides data for the consistency check +type DataProvider struct { + fsys fs.FS + discpath string + lbs ListBlobstore +} + +// NodeData holds data about the nodes +type NodeData struct { + NodePath string + BlobPath string + Inconsistencies []Inconsistency +} + +// LinkData about the symlinks +type LinkData struct { + LinkPath string + NodePath string +} + +// BlobData about the blobs in the blobstore +type BlobData struct { + BlobPath string +} + +// NewProvider creates a new DataProvider object +func NewProvider(fsys fs.FS, discpath string, lbs ListBlobstore) *DataProvider { + return &DataProvider{ + fsys: fsys, + discpath: discpath, + lbs: lbs, + } +} + +// ProduceData produces data for the consistency check +func (c *DataProvider) ProduceData() (chan NodeData, chan LinkData, chan BlobData, chan struct{}, error) { + dirs, err := fs.Glob(c.fsys, "spaces/*/*/nodes/*/*/*/*") + if err != nil { + return nil, nil, nil, nil, err + } + + if len(dirs) == 0 { + return nil, nil, nil, nil, errors.New("no backup found. Double check storage path") + } + + nodes := make(chan NodeData) + links := make(chan LinkData) + blobs := make(chan BlobData) + quit := make(chan struct{}) + wg := sync.WaitGroup{} + + // crawl spaces + wg.Add(1) + go func() { + for _, d := range dirs { + entries, err := fs.ReadDir(c.fsys, d) + if err != nil { + fmt.Println("error reading dir", err) + continue + } + + if len(entries) == 0 { + fmt.Println("empty dir", filepath.Join(c.discpath, d)) + continue + } + + for _, e := range entries { + switch { + case e.IsDir(): + ls, err := fs.ReadDir(c.fsys, filepath.Join(d, e.Name())) + if err != nil { + fmt.Println("error reading dir", err) + continue + } + for _, l := range ls { + linkpath := filepath.Join(c.discpath, d, e.Name(), l.Name()) + + r, _ := os.Readlink(linkpath) + nodePath := filepath.Join(c.discpath, d, e.Name(), r) + links <- LinkData{LinkPath: linkpath, NodePath: nodePath} + } + fallthrough + case filepath.Ext(e.Name()) == "" || _versionRegex.MatchString(e.Name()) || _trashRegex.MatchString(e.Name()): + np := filepath.Join(c.discpath, d, e.Name()) + var inc []Inconsistency + if !c.filesExist(filepath.Join(d, e.Name())) { + inc = append(inc, InconsistencyFilesMissing) + } + bp, i := c.getBlobPath(filepath.Join(d, e.Name())) + if i != "" { + inc = append(inc, i) + } + + nodes <- NodeData{NodePath: np, BlobPath: bp, Inconsistencies: inc} + } + } + } + wg.Done() + }() + + // crawl trash + wg.Add(1) + go func() { + linkpaths, err := fs.Glob(c.fsys, "spaces/*/*/trash/*/*/*/*/*") + if err != nil { + fmt.Println("error reading trash", err) + } + for _, l := range linkpaths { + linkpath := filepath.Join(c.discpath, l) + r, _ := os.Readlink(linkpath) + p := filepath.Join(c.discpath, l, "..", r) + links <- LinkData{LinkPath: linkpath, NodePath: p} + } + wg.Done() + }() + + // crawl blobstore + wg.Add(1) + go func() { + bs, err := c.lbs.List() + if err != nil { + fmt.Println("error listing blobs", err) + } + + for _, bn := range bs { + blobs <- BlobData{BlobPath: c.lbs.Path(bn)} + } + wg.Done() + }() + + // wait for all crawlers to finish + go func() { + wg.Wait() + quit <- struct{}{} + close(nodes) + close(links) + close(blobs) + close(quit) + }() + + return nodes, links, blobs, quit, nil +} + +func (c *DataProvider) getBlobPath(path string) (string, Inconsistency) { + b, err := fs.ReadFile(c.fsys, path+".mpk") + if err != nil { + return "", InconsistencyFilesMissing + } + + m := map[string][]byte{} + if err := msgpack.Unmarshal(b, &m); err != nil { + return "", InconsistencyMalformedFile + } + + if bid := m["user.ocis.blobid"]; string(bid) != "" { + spaceID, _ := getIDsFromPath(filepath.Join(c.discpath, path)) + return c.lbs.Path(&node.Node{BlobID: string(bid), SpaceID: spaceID}), "" + } + + return "", "" +}